- How do I stream changes from PostgreSQL into Antfly?
- How do I set up CDC replication with Cloud SQL, AlloyDB, Supabase, Neon, or RDS?
- How do I control which PostgreSQL columns map to Antfly document fields?
- How do I handle soft deletes or document deletion on PostgreSQL DELETE?
Antfly has built-in Change Data Capture (CDC) that streams real-time changes from PostgreSQL tables into Antfly documents using logical replication. No external daemon or sync process required — just add a replication_sources block when creating a table.
Architecture
┌───────────────────┐
│ PostgreSQL │
│ (source table) │
└────────┬──────────┘
│ WAL (logical replication)
│ pgoutput protocol v2
│
┌────────▼──────────┐
│ Antfly Metadata │
│ Leader │
│ ┌──────────────┐ │ ┌──────────────┐
│ │ CDC Worker │─┼─────────▶ Antfly │
│ │ (per source) │ │ transform│ Shard │
│ └──────────────┘ │ └──────────────┘
└───────────────────┘Each replication source gets a dedicated worker that:
- Connects to PostgreSQL using the
pgoutputlogical decoding plugin - Creates a replication slot and publication (or reuses pre-created ones)
- Streams WAL changes and converts them to Antfly transform operations
- Persists LSN checkpoints in the metadata Raft store for crash recovery
- Reconnects automatically with exponential backoff on transient errors
Prerequisites
- PostgreSQL 15+ (for pgoutput protocol v2)
wal_level = logicalon the source database- A database user with the
REPLICATIONattribute (or equivalent managed-service role)
Quick Start
1. Enable logical replication on PostgreSQL
-- Self-hosted PostgreSQL
ALTER SYSTEM SET wal_level = logical;
-- Then restart PostgreSQLFor managed services, see Cloud Provider Setup below.
2. Create the source table in PostgreSQL
CREATE TABLE users (
id TEXT PRIMARY KEY,
name TEXT,
email TEXT,
score INT
);3. Create an Antfly table with a replication source
curl -X POST http://localhost:8080/api/v1/table \
-u admin:admin \
-H "Content-Type: application/json" \
-d '{
"name": "users",
"replication_sources": [
{
"type": "postgres",
"dsn": "postgres://user:pass@pg-host:5432/mydb?sslmode=require",
"postgres_table": "users",
"key_template": "id"
}
]
}'That's it. Antfly will automatically create a replication slot and publication, then start streaming changes. Any INSERT, UPDATE, or DELETE on the PostgreSQL users table will be reflected in the Antfly users table.
4. Verify replication
-- Insert a row in PostgreSQL
INSERT INTO users (id, name, email, score) VALUES ('user-1', 'Alice', 'alice@example.com', 100);# Check it appears in Antfly
curl -s http://localhost:8080/api/v1/table/users/key/user-1 -u admin:admin | jq .Configuration Reference
Replication Source Fields
| Field | Required | Default | Description |
|---|---|---|---|
type | Yes | — | Must be "postgres" |
dsn | Yes | — | PostgreSQL connection string. Supports ${secret:key} references. |
postgres_table | Yes | — | Source table name in PostgreSQL |
key_template | No | "id" | How to derive the Antfly document key. Plain column name or {{col}} template. |
slot_name | No | auto-derived | Replication slot name. Set this when using pre-created slots. |
publication_name | No | auto-derived | Publication name. Set this when using pre-created publications. |
on_update | No | passthrough | Transform ops for INSERT/UPDATE events |
on_delete | No | auto-derived | Transform ops for DELETE events |
Key Templates
"key_template": "id"Uses the id column value as the Antfly document key.
"key_template": "{{tenant_id}}:{{user_id}}"Composite key from multiple columns, producing keys like "acme:user-42".
Transform Operations
By default (passthrough mode), every column in the PostgreSQL row becomes a field in the Antfly document via $set. You can customize this with on_update and on_delete.
Custom on_update — select and rename columns:
"on_update": [
{ "op": "$set", "path": "name", "value": "{{user_name}}" },
{ "op": "$set", "path": "email", "value": "{{user_email}}" },
{ "op": "$set", "path": "active", "value": true }
]Literal values (like true above) are set on every INSERT/UPDATE. Column references ({{user_name}}) resolve to the current row's value.
$merge — flatten a JSONB column into top-level fields:
"on_update": [
{ "op": "$merge", "value": "{{metadata}}" }
]If metadata is {"role": "admin", "team": "eng"}, the document gets role and team as top-level fields.
Custom on_delete — soft delete pattern:
"on_delete": [
{ "op": "$set", "path": "active", "value": false },
{ "op": "$currentDate", "path": "deleted_at" }
]$delete_document — remove the Antfly document entirely:
"on_delete": [
{ "op": "$delete_document" }
]If on_delete is omitted, Antfly auto-derives $unset operations from the on_update paths, which removes only the fields this source set (safe for multi-source tables).
Secrets
Never put credentials directly in the DSN. Use the Antfly keystore:
# Store the DSN as a secret
curl -X PUT http://localhost:8080/api/v1/secrets/pg_dsn \
-u admin:admin \
-H "Content-Type: application/json" \
-d '{"value": "postgres://user:pass@pg-host:5432/mydb?sslmode=require"}'Then reference it:
"dsn": "${secret:pg_dsn}"Cloud Provider Setup
Each managed PostgreSQL service requires slightly different configuration to enable logical replication. The core requirement is the same: wal_level=logical and a user with replication privileges.
Google Cloud SQL
-
Enable logical decoding in the Cloud SQL instance flags:
cloudsql.logical_decoding = onThis can be set in the Google Cloud Console under Instance > Configuration > Flags, or via
gcloud:gcloud sql instances patch INSTANCE_NAME \ --database-flags=cloudsql.logical_decoding=onChanging this flag requires a database restart.
-
Grant replication privileges to your database user:
ALTER USER myuser WITH REPLICATION;Or grant the built-in
cloudsql.replicationrole if your user cannot be altered. -
Pre-create the publication if your user doesn't own the source table:
-- Run as the table owner CREATE PUBLICATION antfly_pub_users FOR TABLE users;Then reference it in the Antfly config:
{ "type": "postgres", "dsn": "${secret:cloudsql_dsn}", "postgres_table": "users", "publication_name": "antfly_pub_users" } -
Connect via Cloud SQL Auth Proxy (recommended) or direct IP with SSL:
# With Auth Proxy running on localhost:5432 "dsn": "postgres://myuser:pass@localhost:5432/mydb"
Google AlloyDB
-
Enable logical decoding via the AlloyDB instance flag:
alloydb.logical_decoding = onSet this in the AlloyDB console or via
gcloud:gcloud alloydb instances update INSTANCE_NAME \ --cluster=CLUSTER_NAME \ --region=REGION \ --database-flags=alloydb.logical_decoding=onChanging this flag requires a database restart.
-
Grant replication to the user and pre-create publications as with Cloud SQL (AlloyDB uses the same
alloydb.replicationrole model).
Supabase
Supabase enables logical replication by default. To use it with Antfly:
-
Create a publication in the Supabase SQL editor:
CREATE PUBLICATION antfly_pub_users FOR TABLE public.users; -
Use the direct connection string (not the pooled connection). Find it in your Supabase dashboard under Settings > Database > Connection string > URI. The direct connection typically uses port
5432, while the pooled connection uses port6543.PgBouncer (Supabase's connection pooler on port 6543) does not support the replication protocol. You must use the direct connection on port 5432.
-
Configure the source with the pre-created publication:
{ "type": "postgres", "dsn": "${secret:supabase_dsn}", "postgres_table": "users", "publication_name": "antfly_pub_users", "slot_name": "antfly_users_slot" }
Neon
-
Enable logical replication in the Neon console:
Go to Project > Settings > Logical Replication and enable it. This sets
wal_level=logicaland restarts the compute endpoint. -
Pre-create the publication and slot in the Neon SQL editor:
CREATE PUBLICATION antfly_pub_users FOR TABLE users;Neon manages replication slots carefully due to its branching architecture. Slots are supported on the primary branch. See Neon's logical replication docs for current limitations.
-
Use the direct connection string (not the pooled endpoint). Set
sslmode=require:{ "type": "postgres", "dsn": "${secret:neon_dsn}", "postgres_table": "users", "publication_name": "antfly_pub_users" }
Amazon RDS
-
Set the
rds.logical_replicationparameter to1:In the RDS console, edit your DB parameter group and set:
rds.logical_replication = 1Or via the AWS CLI:
aws rds modify-db-parameter-group \ --db-parameter-group-name my-param-group \ --parameters "ParameterName=rds.logical_replication,ParameterValue=1,ApplyMethod=pending-reboot"This requires a reboot of the RDS instance.
-
Grant
rds_replicationto your user:GRANT rds_replication TO myuser; -
Pre-create publications as with other managed services, or let Antfly auto-create them if the user owns the tables.
Amazon Aurora PostgreSQL
Same as RDS — set rds.logical_replication = 1 in the cluster parameter group and grant rds_replication to the user. Aurora supports logical replication on the writer instance.
Aurora reader instances cannot be used as replication sources. Always point the DSN at the writer endpoint.
Self-Hosted PostgreSQL
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10; -- at least 1 per replication source
ALTER SYSTEM SET max_wal_senders = 10; -- at least 1 per replication sourceThen restart PostgreSQL. Verify with:
SHOW wal_level; -- should return "logical"Multi-Source Tables
Multiple PostgreSQL tables can feed into a single Antfly table. This is useful for denormalization — combining related data from different tables into one searchable document.
{
"name": "users",
"replication_sources": [
{
"type": "postgres",
"dsn": "${secret:pg_dsn}",
"postgres_table": "users",
"key_template": "id",
"on_update": [
{ "op": "$set", "path": "name", "value": "{{name}}" },
{ "op": "$set", "path": "email", "value": "{{email}}" }
]
},
{
"type": "postgres",
"dsn": "${secret:pg_dsn}",
"postgres_table": "scores",
"key_template": "user_id",
"on_update": [
{ "op": "$set", "path": "score", "value": "{{score}}" },
{ "op": "$set", "path": "level", "value": "{{level}}" }
]
}
]
}When a row is deleted from scores, only score and level are unset — the name and email fields from the users source remain intact.
Monitoring and Troubleshooting
Check replication status
Look for CDC-related log messages on the Antfly metadata leader:
Starting CDC replication worker table=users pgTable=users slot=antfly_users_users
connected to postgres for replication
starting replication start_lsn=0/0Common issues
"permission denied to create replication slot"
- The database user needs
REPLICATIONattribute (self-hosted) or the cloud-specific replication role (cloudsql.replication,rds_replication, etc.) - Alternative: pre-create the slot and pass
slot_namein the config
"publication does not exist"
- The user lacks permission to create publications. Pre-create it and pass
publication_name.
"wal_level is not logical"
- The source PostgreSQL instance needs
wal_level=logical. See Cloud Provider Setup for your platform.
Worker reconnects in a loop
- Check that the DSN is correct and the PostgreSQL instance is reachable from the Antfly metadata node
- Verify SSL settings (
sslmode=requirefor most cloud providers) - Check that you're using the direct connection, not a connection pooler (PgBouncer, Supavisor, etc.)
DELETE events have no effect
- The source table needs
REPLICA IDENTITYset. For tables with a primary key, this is automatic (DEFAULT). For tables without a primary key, run:ALTER TABLE users REPLICA IDENTITY FULL;
Comparison with PostgreSQL Sync
Antfly offers two approaches for syncing data from PostgreSQL:
| PostgreSQL Replication (this guide) | PostgreSQL Sync | |
|---|---|---|
| Mechanism | WAL logical replication | LISTEN/NOTIFY + polling |
| Setup | Declarative (table config) | External Go daemon |
| Latency | Sub-second (WAL streaming) | ~1 second (batched notifications) |
| Requires | wal_level=logical | Triggers on source tables |
| Crash recovery | LSN checkpoints (no data loss) | Periodic full sync catches gaps |
| Best for | Production pipelines | Quick prototyping, JSONB columns |