Common questions about this section
  • How do I stream changes from PostgreSQL into Antfly?
  • How do I set up CDC replication with Cloud SQL, AlloyDB, Supabase, Neon, or RDS?
  • How do I control which PostgreSQL columns map to Antfly document fields?
  • How do I handle soft deletes or document deletion on PostgreSQL DELETE?

Antfly has built-in Change Data Capture (CDC) that streams real-time changes from PostgreSQL tables into Antfly documents using logical replication. No external daemon or sync process required — just add a replication_sources block when creating a table.

Architecture#

┌───────────────────┐
│   PostgreSQL      │
│   (source table)  │
└────────┬──────────┘
         │  WAL (logical replication)
         │  pgoutput protocol v2
┌────────▼──────────┐
│  Antfly Metadata  │
│  Leader           │
│  ┌──────────────┐ │          ┌──────────────┐
│  │  CDC Worker   │─┼─────────▶  Antfly      │
│  │  (per source) │ │ transform│  Shard       │
│  └──────────────┘ │          └──────────────┘
└───────────────────┘

Each replication source gets a dedicated worker that:

  1. Connects to PostgreSQL using the pgoutput logical decoding plugin
  2. Creates a replication slot and publication (or reuses pre-created ones)
  3. Streams WAL changes and converts them to Antfly transform operations
  4. Persists LSN checkpoints in the metadata Raft store for crash recovery
  5. Reconnects automatically with exponential backoff on transient errors

Prerequisites#

  • PostgreSQL 15+ (for pgoutput protocol v2)
  • wal_level = logical on the source database
  • A database user with the REPLICATION attribute (or equivalent managed-service role)

Quick Start#

1. Enable logical replication on PostgreSQL#

-- Self-hosted PostgreSQL
ALTER SYSTEM SET wal_level = logical;
-- Then restart PostgreSQL

For managed services, see Cloud Provider Setup below.

2. Create the source table in PostgreSQL#

CREATE TABLE users (
    id TEXT PRIMARY KEY,
    name TEXT,
    email TEXT,
    score INT
);

3. Create an Antfly table with a replication source#

curl -X POST http://localhost:8080/api/v1/table \
  -u admin:admin \
  -H "Content-Type: application/json" \
  -d '{
    "name": "users",
    "replication_sources": [
      {
        "type": "postgres",
        "dsn": "postgres://user:pass@pg-host:5432/mydb?sslmode=require",
        "postgres_table": "users",
        "key_template": "id"
      }
    ]
  }'

That's it. Antfly will automatically create a replication slot and publication, then start streaming changes. Any INSERT, UPDATE, or DELETE on the PostgreSQL users table will be reflected in the Antfly users table.

4. Verify replication#

-- Insert a row in PostgreSQL
INSERT INTO users (id, name, email, score) VALUES ('user-1', 'Alice', 'alice@example.com', 100);
# Check it appears in Antfly
curl -s http://localhost:8080/api/v1/table/users/key/user-1 -u admin:admin | jq .

Configuration Reference#

Replication Source Fields#

FieldRequiredDefaultDescription
typeYesMust be "postgres"
dsnYesPostgreSQL connection string. Supports ${secret:key} references.
postgres_tableYesSource table name in PostgreSQL
key_templateNo"id"How to derive the Antfly document key. Plain column name or {{col}} template.
slot_nameNoauto-derivedReplication slot name. Set this when using pre-created slots.
publication_nameNoauto-derivedPublication name. Set this when using pre-created publications.
on_updateNopassthroughTransform ops for INSERT/UPDATE events
on_deleteNoauto-derivedTransform ops for DELETE events

Key Templates#

"key_template": "id"

Uses the id column value as the Antfly document key.

"key_template": "{{tenant_id}}:{{user_id}}"

Composite key from multiple columns, producing keys like "acme:user-42".

Transform Operations#

By default (passthrough mode), every column in the PostgreSQL row becomes a field in the Antfly document via $set. You can customize this with on_update and on_delete.

Custom on_update — select and rename columns:

"on_update": [
  { "op": "$set", "path": "name", "value": "{{user_name}}" },
  { "op": "$set", "path": "email", "value": "{{user_email}}" },
  { "op": "$set", "path": "active", "value": true }
]

Literal values (like true above) are set on every INSERT/UPDATE. Column references ({{user_name}}) resolve to the current row's value.

$merge — flatten a JSONB column into top-level fields:

"on_update": [
  { "op": "$merge", "value": "{{metadata}}" }
]

If metadata is {"role": "admin", "team": "eng"}, the document gets role and team as top-level fields.

Custom on_delete — soft delete pattern:

"on_delete": [
  { "op": "$set", "path": "active", "value": false },
  { "op": "$currentDate", "path": "deleted_at" }
]

$delete_document — remove the Antfly document entirely:

"on_delete": [
  { "op": "$delete_document" }
]

If on_delete is omitted, Antfly auto-derives $unset operations from the on_update paths, which removes only the fields this source set (safe for multi-source tables).

Secrets#

Never put credentials directly in the DSN. Use the Antfly keystore:

# Store the DSN as a secret
curl -X PUT http://localhost:8080/api/v1/secrets/pg_dsn \
  -u admin:admin \
  -H "Content-Type: application/json" \
  -d '{"value": "postgres://user:pass@pg-host:5432/mydb?sslmode=require"}'

Then reference it:

"dsn": "${secret:pg_dsn}"

Cloud Provider Setup#

Each managed PostgreSQL service requires slightly different configuration to enable logical replication. The core requirement is the same: wal_level=logical and a user with replication privileges.

Google Cloud SQL#

  1. Enable logical decoding in the Cloud SQL instance flags:

    cloudsql.logical_decoding = on

    This can be set in the Google Cloud Console under Instance > Configuration > Flags, or via gcloud:

    gcloud sql instances patch INSTANCE_NAME \
      --database-flags=cloudsql.logical_decoding=on

    Changing this flag requires a database restart.

  2. Grant replication privileges to your database user:

    ALTER USER myuser WITH REPLICATION;

    Or grant the built-in cloudsql.replication role if your user cannot be altered.

  3. Pre-create the publication if your user doesn't own the source table:

    -- Run as the table owner
    CREATE PUBLICATION antfly_pub_users FOR TABLE users;

    Then reference it in the Antfly config:

    {
      "type": "postgres",
      "dsn": "${secret:cloudsql_dsn}",
      "postgres_table": "users",
      "publication_name": "antfly_pub_users"
    }
  4. Connect via Cloud SQL Auth Proxy (recommended) or direct IP with SSL:

    # With Auth Proxy running on localhost:5432
    "dsn": "postgres://myuser:pass@localhost:5432/mydb"

Google AlloyDB#

  1. Enable logical decoding via the AlloyDB instance flag:

    alloydb.logical_decoding = on

    Set this in the AlloyDB console or via gcloud:

    gcloud alloydb instances update INSTANCE_NAME \
      --cluster=CLUSTER_NAME \
      --region=REGION \
      --database-flags=alloydb.logical_decoding=on

    Changing this flag requires a database restart.

  2. Grant replication to the user and pre-create publications as with Cloud SQL (AlloyDB uses the same alloydb.replication role model).

Supabase#

Supabase enables logical replication by default. To use it with Antfly:

  1. Create a publication in the Supabase SQL editor:

    CREATE PUBLICATION antfly_pub_users FOR TABLE public.users;
  2. Use the direct connection string (not the pooled connection). Find it in your Supabase dashboard under Settings > Database > Connection string > URI. The direct connection typically uses port 5432, while the pooled connection uses port 6543.

    PgBouncer (Supabase's connection pooler on port 6543) does not support the replication protocol. You must use the direct connection on port 5432.

  3. Configure the source with the pre-created publication:

    {
      "type": "postgres",
      "dsn": "${secret:supabase_dsn}",
      "postgres_table": "users",
      "publication_name": "antfly_pub_users",
      "slot_name": "antfly_users_slot"
    }

Neon#

  1. Enable logical replication in the Neon console:

    Go to Project > Settings > Logical Replication and enable it. This sets wal_level=logical and restarts the compute endpoint.

  2. Pre-create the publication and slot in the Neon SQL editor:

    CREATE PUBLICATION antfly_pub_users FOR TABLE users;

    Neon manages replication slots carefully due to its branching architecture. Slots are supported on the primary branch. See Neon's logical replication docs for current limitations.

  3. Use the direct connection string (not the pooled endpoint). Set sslmode=require:

    {
      "type": "postgres",
      "dsn": "${secret:neon_dsn}",
      "postgres_table": "users",
      "publication_name": "antfly_pub_users"
    }

Amazon RDS#

  1. Set the rds.logical_replication parameter to 1:

    In the RDS console, edit your DB parameter group and set:

    rds.logical_replication = 1

    Or via the AWS CLI:

    aws rds modify-db-parameter-group \
      --db-parameter-group-name my-param-group \
      --parameters "ParameterName=rds.logical_replication,ParameterValue=1,ApplyMethod=pending-reboot"

    This requires a reboot of the RDS instance.

  2. Grant rds_replication to your user:

    GRANT rds_replication TO myuser;
  3. Pre-create publications as with other managed services, or let Antfly auto-create them if the user owns the tables.

Amazon Aurora PostgreSQL#

Same as RDS — set rds.logical_replication = 1 in the cluster parameter group and grant rds_replication to the user. Aurora supports logical replication on the writer instance.

Aurora reader instances cannot be used as replication sources. Always point the DSN at the writer endpoint.

Self-Hosted PostgreSQL#

ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;  -- at least 1 per replication source
ALTER SYSTEM SET max_wal_senders = 10;        -- at least 1 per replication source

Then restart PostgreSQL. Verify with:

SHOW wal_level;  -- should return "logical"

Multi-Source Tables#

Multiple PostgreSQL tables can feed into a single Antfly table. This is useful for denormalization — combining related data from different tables into one searchable document.

{
  "name": "users",
  "replication_sources": [
    {
      "type": "postgres",
      "dsn": "${secret:pg_dsn}",
      "postgres_table": "users",
      "key_template": "id",
      "on_update": [
        { "op": "$set", "path": "name", "value": "{{name}}" },
        { "op": "$set", "path": "email", "value": "{{email}}" }
      ]
    },
    {
      "type": "postgres",
      "dsn": "${secret:pg_dsn}",
      "postgres_table": "scores",
      "key_template": "user_id",
      "on_update": [
        { "op": "$set", "path": "score", "value": "{{score}}" },
        { "op": "$set", "path": "level", "value": "{{level}}" }
      ]
    }
  ]
}

When a row is deleted from scores, only score and level are unset — the name and email fields from the users source remain intact.

Monitoring and Troubleshooting#

Check replication status#

Look for CDC-related log messages on the Antfly metadata leader:

Starting CDC replication worker  table=users pgTable=users slot=antfly_users_users
connected to postgres for replication
starting replication  start_lsn=0/0

Common issues#

"permission denied to create replication slot"

  • The database user needs REPLICATION attribute (self-hosted) or the cloud-specific replication role (cloudsql.replication, rds_replication, etc.)
  • Alternative: pre-create the slot and pass slot_name in the config

"publication does not exist"

  • The user lacks permission to create publications. Pre-create it and pass publication_name.

"wal_level is not logical"

  • The source PostgreSQL instance needs wal_level=logical. See Cloud Provider Setup for your platform.

Worker reconnects in a loop

  • Check that the DSN is correct and the PostgreSQL instance is reachable from the Antfly metadata node
  • Verify SSL settings (sslmode=require for most cloud providers)
  • Check that you're using the direct connection, not a connection pooler (PgBouncer, Supavisor, etc.)

DELETE events have no effect

  • The source table needs REPLICA IDENTITY set. For tables with a primary key, this is automatic (DEFAULT). For tables without a primary key, run:
    ALTER TABLE users REPLICA IDENTITY FULL;

Comparison with PostgreSQL Sync#

Antfly offers two approaches for syncing data from PostgreSQL:

PostgreSQL Replication (this guide)PostgreSQL Sync
MechanismWAL logical replicationLISTEN/NOTIFY + polling
SetupDeclarative (table config)External Go daemon
LatencySub-second (WAL streaming)~1 second (batched notifications)
Requireswal_level=logicalTriggers on source tables
Crash recoveryLSN checkpoints (no data loss)Periodic full sync catches gaps
Best forProduction pipelinesQuick prototyping, JSONB columns