This example demonstrates real-time synchronization from a Postgres JSONB column to Antfly using:

  1. Linear Merge API for efficient batch syncing
  2. LISTEN/NOTIFY for real-time change detection
  3. Periodic full syncs to catch any missed changes

Perfect for keeping Antfly in sync with your existing Postgres database!

Features#

Real-time Updates - Changes in Postgres instantly sync to Antfly via LISTEN/NOTIFY ✅ Efficient Batching - Rapid changes are batched together (1-second window) ✅ Content Hash Optimization - Unchanged documents are skipped (no unnecessary writes) ✅ Automatic Deletion - Documents deleted from Postgres are removed from Antfly ✅ Periodic Full Sync - Configurable full sync to ensure consistency ✅ Graceful Shutdown - Clean exit with statistics on Ctrl+C ✅ Production Ready - Connection pooling, error handling, metrics

Architecture#

┌─────────────────┐
│   Postgres DB   │
│  (JSONB table)  │
└────┬───────┬────┘
     │       │
     │       │ LISTEN/NOTIFY real-time
     │       │ 
     │       │
     │   ┌───▼────────────┐          ┌──────────────┐
     │   │  postgres-sync │─────────▶│    Antfly    │
     │   │    (daemon)    │  Linear  │  (search &   │
     │   └───▲────────────┘  Merge   │   storage)   │
     │       │                       └──────────────┘
     │       │
     └───────┘ Periodic "insurance" sync every 5 min

How It Works#

  1. Initial Full Sync:
    • Queries all rows from Postgres table
    • Uses Linear Merge API to sync to Antfly
    • Content hashing skips unchanged documents
135 lines
1// FullSync performs a complete sync of all data from Postgres to Antfly
2func (ps *PostgresSync) FullSync(ctx context.Context) error {
3	startTime := time.Now()
4
5	// Query all records from Postgres
6	query := fmt.Sprintf(`
7		SELECT %s, %s
8		FROM %s
9		ORDER BY %s
10	`, ps.config.IDColumn, ps.config.DataColumn, ps.config.TableName, ps.config.IDColumn)
11
12	rows, err := ps.pgPool.Query(ctx, query)
13	if err != nil {
14		return fmt.Errorf("failed to query Postgres: %w", err)
15	}
16	defer rows.Close()
17
18	// Collect records in batches
19	var records []struct {
20		ID   string
21		Data map[string]any
22	}
23
24	for rows.Next() {
25		var id string
  1. Real-time Updates (LISTEN/NOTIFY):

    • Postgres trigger fires on INSERT/UPDATE/DELETE
    • Sends notification with change details
    • Go daemon receives notification
    • Batches rapid changes (1-second window)
    • Syncs batch to Antfly via Linear Merge
  2. Periodic Full Sync:

    • Runs every N minutes (configurable)
    • Ensures consistency if any notifications were missed
    • Catches documents modified outside triggers

Prerequisites#

1. Postgres Database#

# Using Docker
docker run --name postgres-antfly-demo \
  -e POSTGRES_PASSWORD=secret \
  -e POSTGRES_DB=antfly_demo \
  -p 5432:5432 \
  -d postgres:16

# Or use existing Postgres instance

2. Antfly Running#

cd /path/to/antfly
go run ./cmd/antfly swarm

3. Build the sync tool#

cd /path/to/antfly
go build -o postgres-sync ./examples/postgres-sync

Quick Start#

Step 1: Set up Postgres schema#

# Connect to your Postgres database
psql postgresql://postgres:secret@localhost:5432/antfly_demo

# Run the schema setup
\i examples/postgres-sync/schema.sql

This creates:

  • documents table with JSONB data column
  • Triggers for LISTEN/NOTIFY on changes
  • Sample data (5 documents)

Step 2: Start the sync daemon#

export POSTGRES_URL="postgresql://postgres:secret@localhost:5432/antfly_demo"

./postgres-sync \
  --postgres "$POSTGRES_URL" \
  --antfly http://localhost:8080/api/v1 \
  --pg-table documents \
  --antfly-table postgres_docs \
  --create-table \
  --full-sync-interval 5m

You should see:

=== Postgres to Antfly Real-time Sync ===
Postgres: postgresql://postgres:***@localhost:5432/antfly_demo
Antfly: http://localhost:8080/api/v1
Table: documents.data -> postgres_docs
Full sync interval: 5m0s

✓ Created Antfly table 'postgres_docs'
Performing initial full sync...
Full sync: Found 5 records in Postgres
  Batch 1-5: 5 upserted, 0 skipped, 0 deleted
✓ Full sync complete: 5 upserted, 0 skipped, 0 deleted in 123ms

Starting real-time sync (LISTEN/NOTIFY)...
✓ Listening on channel 'documents_changes'
✓ Real-time sync active

Sync is running. Press Ctrl+C to stop.

Step 3: Test real-time sync#

In another terminal, connect to Postgres and make changes:

psql $POSTGRES_URL
-- Insert a new document
INSERT INTO documents (id, data) VALUES
  ('test_001', '{"title": "Real-time Test", "content": "This syncs instantly!"}');

-- Update a document
UPDATE documents
SET data = data || '{"updated": true}'
WHERE id = 'doc_001';

-- Delete a document
DELETE FROM documents WHERE id = 'test_001';

Watch the sync daemon output:

← Change detected: INSERT test_001
→ Real-time sync: 1 upserted, 0 skipped

← Change detected: UPDATE doc_001
→ Real-time sync: 1 upserted, 0 skipped

← Change detected: DELETE test_001
→ Real-time sync: 1 deleted

Configuration Options#

FlagDescriptionDefault
--postgresPostgres connection URL$POSTGRES_URL
--antflyAntfly API URLhttp://localhost:8080/api/v1
--pg-tablePostgres table namedocuments
--id-columnID column nameid
--data-columnJSONB data column namedata
--antfly-tableAntfly table namepostgres_docs
--full-sync-intervalFull sync interval (0=disable)5m
--batch-sizeBatch size for sync1000
--create-tableCreate Antfly tablefalse
--num-shardsNumber of shards for new table3

Usage Examples#

Basic Usage#

# Minimal config (uses environment variable)
export POSTGRES_URL="postgresql://user:pass@localhost/db"
./postgres-sync --create-table

Custom Table Names#

./postgres-sync \
  --postgres "$POSTGRES_URL" \
  --pg-table products \
  --id-column product_id \
  --data-column metadata \
  --antfly-table product_catalog

Disable Periodic Sync (Real-time Only)#

./postgres-sync \
  --postgres "$POSTGRES_URL" \
  --full-sync-interval 0

High-Volume Sync#

./postgres-sync \
  --postgres "$POSTGRES_URL" \
  --batch-size 5000 \
  --full-sync-interval 30m

Document Schema#

Each Postgres row is stored in Antfly as:

{
  "id": "doc_001",
  "data": {
    "title": "Getting Started",
    "content": "Welcome to Antfly",
    "category": "tutorial"
  },
  "source": "postgres",
  "synced_at": "2024-01-15T10:30:00Z"
}
  • id: From Postgres ID column
  • data: The JSONB column content
  • source: Always "postgres"
  • synced_at: Timestamp of last sync

Testing Real-time Sync#

We provide a demo SQL script with various test scenarios:

psql $POSTGRES_URL -f examples/postgres-sync/demo-changes.sql

This demonstrates:

  • Single inserts
  • Bulk inserts (batching)
  • Updates
  • Deletes
  • Transactional changes
  • Random update generator

Interactive Testing#

-- 1. Insert new documents
INSERT INTO documents (id, data) VALUES
  ('demo_001', '{"title": "Demo", "content": "Testing sync"}');

-- Watch sync daemon: "← Change detected: INSERT demo_001"

-- 2. Bulk insert (tests batching)
INSERT INTO documents (id, data)
SELECT
  'bulk_' || i,
  jsonb_build_object('title', 'Bulk Doc ' || i, 'index', i)
FROM generate_series(1, 100) AS i;

-- Watch sync daemon batch them together!

-- 3. Update multiple records
UPDATE documents
SET data = data || '{"updated": true}'
WHERE id LIKE 'bulk_%';

-- 4. Delete them
DELETE FROM documents WHERE id LIKE 'bulk_%';

Monitoring#

The sync daemon prints statistics every 30 seconds:

--- Sync Statistics ---
Total synced: 1,234
Total skipped: 5,678
Total deleted: 42
Real-time updates: 89
Errors: 0
Last full sync: 2m30s ago
Last real-time sync: 5s ago
----------------------

How LISTEN/NOTIFY Works#

Postgres Side#

When you INSERT/UPDATE/DELETE a row, a trigger fires:

CREATE TRIGGER documents_change_trigger
  AFTER INSERT OR UPDATE OR DELETE ON documents
  FOR EACH ROW
  EXECUTE FUNCTION notify_document_change();

The function sends a notification:

PERFORM pg_notify('documents_changes', json_build_object(
  'operation', 'INSERT',
  'id', NEW.id,
  'data', NEW.data,
  'timestamp', NOW()
)::text);

Go Side#

The sync daemon listens for notifications:

89 lines
1// StartRealtimeSync starts listening for Postgres notifications
2func (ps *PostgresSync) StartRealtimeSync(ctx context.Context) error {
3	// Use a dedicated connection for LISTEN
4	conn, err := pgx.Connect(ctx, ps.config.PostgresURL)
5	if err != nil {
6		return fmt.Errorf("failed to create listen connection: %w", err)
7	}
8	defer func() {
9		if err := conn.Close(ctx); err != nil {
10			log.Printf("Warning: Failed to close connection: %v", err)
11		}
12	}()
13
14	// Start listening for notifications
15	channelName := ps.config.TableName + "_changes"
16	_, err = conn.Exec(ctx, "LISTEN "+pgx.Identifier{channelName}.Sanitize())
17	if err != nil {
18		return fmt.Errorf("failed to LISTEN: %w", err)
19	}
20
21	fmt.Printf("✓ Listening on channel '%s'\n", channelName)
22
23	// Buffer for batching rapid changes
24	changeBatch := make(map[string]ChangeEvent)
25	var batchMu sync.Mutex

Batching Strategy#

Rapid changes are batched to avoid overwhelming Antfly:

  • Changes accumulate in a 1-second window
  • Batch is processed every second
  • Multiple changes to the same document are de-duplicated
  • Linear Merge API handles the batch efficiently

Example:

0.0s: INSERT doc_001
0.1s: UPDATE doc_001
0.3s: INSERT doc_002
0.5s: UPDATE doc_001
1.0s: → Process batch {doc_001, doc_002} (2 records)

Performance Characteristics#

ScenarioPerformance
Initial sync (10K docs)~5-10 seconds
Re-sync unchanged~2-3 seconds (all skipped)
Real-time insert<100ms latency
Bulk insert (1000 docs)Batched in 1-2 seconds
Full sync overheadNegligible (content hash check)

Optimization Tips#

  1. Batch Size: Increase for large tables (up to 10,000)
  2. Full Sync Interval: Reduce if notifications are unreliable
  3. Connection Pool: Increase for high-volume tables
  4. Indexes: Add GIN index on JSONB for faster queries

Troubleshooting#

Connection Issues#

Error: failed to connect to Postgres

Solution: Check Postgres URL and network:

psql "$POSTGRES_URL" -c "SELECT 1"

No Notifications Received#

# Test notifications manually
psql $POSTGRES_URL

-- Terminal 1:
LISTEN documents_changes;

-- Terminal 2:
INSERT INTO documents (id, data) VALUES ('test', '{}');

-- Terminal 1 should show: Asynchronous notification received

Common issues:

  • Trigger not created → Run schema.sql again
  • Wrong channel name → Check --pg-table matches table name
  • Connection dropped → Daemon will reconnect automatically

Documents Not Syncing#

Check the daemon logs for errors:

Error processing batch: linear merge failed: ...

Common issues:

  • Antfly not running → Start Antfly first
  • Table doesn't exist → Use --create-table flag
  • Invalid JSON in JSONB column → Check Postgres data

High Memory Usage#

If syncing a very large table:

  1. Reduce --batch-size (try 500 or 1000)
  2. Increase --full-sync-interval (try 30m or 1h)
  3. Add connection pool limits

Production Deployment#

Docker Compose#

version: '3.8'
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: production
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./schema.sql:/docker-entrypoint-initdb.d/01-schema.sql

  antfly:
    image: antfly:latest
    command: ["swarm"]
    ports:
      - "8080:8080"

  postgres-sync:
    build: ./examples/postgres-sync
    environment:
      POSTGRES_URL: postgresql://postgres:${DB_PASSWORD}@postgres/production
    command:
      - --antfly=http://antfly:8080/api/v1
      - --create-table
      - --full-sync-interval=10m
    depends_on:
      - postgres
      - antfly
    restart: unless-stopped

volumes:
  postgres-data:

Kubernetes#

apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres-sync
spec:
  replicas: 1  # Only run one instance (LISTEN/NOTIFY is single-consumer)
  template:
    spec:
      containers:
      - name: postgres-sync
        image: antfly/postgres-sync:latest
        env:
        - name: POSTGRES_URL
          valueFrom:
            secretKeyRef:
              name: postgres-credentials
              key: url
        args:
        - --antfly=http://antfly-service:8080/api/v1
        - --create-table
        - --full-sync-interval=10m

systemd Service#

[Unit]
Description=Antfly Postgres Sync
After=network.target postgresql.service

[Service]
Type=simple
User=antfly
Environment=POSTGRES_URL=postgresql://localhost/production
ExecStart=/usr/local/bin/postgres-sync \
  --antfly=http://localhost:8080/api/v1 \
  --create-table \
  --full-sync-interval=10m
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

Advanced Use Cases#

Multi-table Sync#

Run multiple sync daemons for different tables:

# Terminal 1: Sync products table
./postgres-sync --pg-table products --antfly-table products_search

# Terminal 2: Sync customers table
./postgres-sync --pg-table customers --antfly-table customers_search

Conditional Sync#

Modify notify_document_change() to filter:

CREATE OR REPLACE FUNCTION notify_document_change()
RETURNS TRIGGER AS $$
BEGIN
  -- Only sync published documents
  IF (NEW.data->>'status' = 'published') THEN
    PERFORM pg_notify(...);
  END IF;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Transform Data During Sync#

Modify the sync tool to transform data:

// In processBatchedChanges()
doc := make(map[string]interface{})
doc["id"] = id
doc["data"] = jsonData

// Add computed fields
if title, ok := jsonData["title"].(string); ok {
  doc["title_lowercase"] = strings.ToLower(title)
}

// Add embeddings (if configured)
if content, ok := jsonData["content"].(string); ok {
  embedding := generateEmbedding(content)
  doc["embedding"] = embedding
}

Comparison with Other Approaches#

ApproachLatencyOverheadComplexity
LISTEN/NOTIFY (this)<100msLowMedium
Polling1-60sHighLow
CDC (Debezium)<1sMediumHigh
Logical Replication<1sLowVery High

LISTEN/NOTIFY is the sweet spot for most use cases!

Limitations#

⚠️ Single Consumer: Only one sync daemon should run per table (LISTEN is not load-balanced)

⚠️ Notification Loss: If daemon is down, notifications are lost (periodic full sync recovers)

⚠️ Payload Size: Postgres notification payload is limited to 8KB (we only send ID + operation, not full data)

⚠️ Transaction Delay: Notifications only fire on COMMIT (delayed for long transactions)

Extending the Example#

Ideas for customization:

  1. Add Filtering: Only sync certain document types
  2. Add Enrichment: Generate embeddings during sync
  3. Add Validation: Validate JSON schema before syncing
  4. Add Metrics: Export Prometheus metrics
  5. Add Dead Letter Queue: Store failed syncs for retry
  6. Multi-tenancy: Support multiple databases
  • docsaf - Sync documentation files to Antfly
  • Linear Merge API docs - See work-log/006-create-linear-merge-api/

Project Files#

640 lines
1package main
2
3import (
4	"context"
5	"encoding/json"
6	"flag"
7	"fmt"
8	"log"
9	"maps"
10	"net/http"
11	"os"
12	"os/signal"
13	"slices"
14	"sync"
15	"syscall"
16	"time"
17
18	"github.com/antflydb/antfly-go/antfly"
19	"github.com/jackc/pgx/v5"
20	"github.com/jackc/pgx/v5/pgxpool"
21)
22
23// ANCHOR: config_type
24// Config holds the sync configuration
25type Config struct {
26	PostgresURL       string
27	AntflyURL         string
28	TableName         string
29	IDColumn          string
30	DataColumn        string
31	AntflyTable       string
32	FullSyncInterval  time.Duration
33	BatchSize         int
34	CreateTable       bool
35	ReplicationFactor int
36}
37
38// ANCHOR_END: config_type
39
40// ANCHOR: postgres_sync_type
41// PostgresSync manages syncing Postgres JSONB data to Antfly
42type PostgresSync struct {
43	config       Config
44	pgPool       *pgxpool.Pool
45	antflyClient *antfly.AntflyClient
46	lastSyncTime time.Time
47	stats        SyncStats
48}
49
50// ANCHOR_END: postgres_sync_type
51
52// ANCHOR: sync_stats_type
53// SyncStats tracks sync statistics
54type SyncStats struct {
55	mu               sync.RWMutex
56	TotalSynced      int64
57	TotalSkipped     int64
58	TotalDeleted     int64
59	TotalErrors      int64
60	LastFullSync     time.Time
61	LastRealtimeSync time.Time
62	RealtimeUpdates  int64
63}
64
65// ANCHOR_END: sync_stats_type
66
67// ANCHOR: change_event_type
68// ChangeEvent represents a database change notification
69type ChangeEvent struct {
70	Operation string         `json:"operation"` // INSERT, UPDATE, DELETE
71	ID        string         `json:"id"`
72	Data      map[string]any `json:"data,omitempty"`
73	Timestamp time.Time      `json:"timestamp"`
74}
75
76// ANCHOR_END: change_event_type
77
78func main() {
79	config := parseFlags()
80
81	ctx, cancel := context.WithCancel(context.Background())
82	defer cancel()
83
84	// Handle graceful shutdown
85	sigChan := make(chan os.Signal, 1)
86	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
87
88	// Create sync manager
89	sync, err := NewPostgresSync(ctx, config)
90	if err != nil {
91		log.Fatalf("Failed to create sync manager: %v", err)
92	}
93	defer sync.Close()
94
95	fmt.Printf("=== Postgres to Antfly Real-time Sync ===\n")
96	fmt.Printf("Postgres: %s\n", maskPassword(config.PostgresURL))
97	fmt.Printf("Antfly: %s\n", config.AntflyURL)
98	fmt.Printf("Table: %s.%s -> %s\n", config.TableName, config.DataColumn, config.AntflyTable)
99	fmt.Printf("Full sync interval: %v\n", config.FullSyncInterval)
100	fmt.Printf("Batch size: %d\n\n", config.BatchSize)
101
102	// Initial full sync
103	fmt.Println("Performing initial full sync...")
104	if err := sync.FullSync(ctx); err != nil {
105		log.Fatalf("Initial sync failed: %v", err)
106	}
107	fmt.Println("✓ Initial sync complete")
108	fmt.Println()
109
110	// Start real-time sync
111	fmt.Println("Starting real-time sync (LISTEN/NOTIFY)...")
112	go func() {
113		if err := sync.StartRealtimeSync(ctx); err != nil {
114			log.Printf("Real-time sync error: %v", err)
115		}
116	}()
117
118	// Start periodic full sync
119	if config.FullSyncInterval > 0 {
120		go func() {
121			ticker := time.NewTicker(config.FullSyncInterval)
122			defer ticker.Stop()
123
124			for {
125				select {
126				case <-ticker.C:
127					fmt.Println("\n--- Periodic full sync starting ---")
128					if err := sync.FullSync(ctx); err != nil {
129						log.Printf("Periodic sync failed: %v", err)
130					}
131					fmt.Println("--- Periodic full sync complete ---")
132					fmt.Println()
133				case <-ctx.Done():
134					return
135				}
136			}
137		}()
138	}
139
140	// Stats reporter
141	go func() {
142		ticker := time.NewTicker(30 * time.Second)
143		defer ticker.Stop()
144
145		for {
146			select {
147			case <-ticker.C:
148				sync.PrintStats()
149			case <-ctx.Done():
150				return
151			}
152		}
153	}()
154
155	fmt.Println("✓ Real-time sync active")
156	fmt.Println("\nSync is running. Press Ctrl+C to stop.")
157	fmt.Println()
158
159	// Wait for shutdown signal
160	<-sigChan
161	fmt.Println("\n\nShutting down gracefully...")
162	cancel()
163
164	// Give time for cleanup
165	time.Sleep(1 * time.Second)
166	sync.PrintStats()
167	fmt.Println("\n✓ Sync stopped")
168}
169
170// NewPostgresSync creates a new sync manager
171func NewPostgresSync(ctx context.Context, config Config) (*PostgresSync, error) {
172	// Create Postgres connection pool
173	pgConfig, err := pgxpool.ParseConfig(config.PostgresURL)
174	if err != nil {
175		return nil, fmt.Errorf("invalid postgres URL: %w", err)
176	}
177
178	pgConfig.MaxConns = 10
179	pgConfig.MinConns = 2
180
181	pgPool, err := pgxpool.NewWithConfig(ctx, pgConfig)
182	if err != nil {
183		return nil, fmt.Errorf("failed to connect to Postgres: %w", err)
184	}
185
186	// Test connection
187	if err := pgPool.Ping(ctx); err != nil {
188		pgPool.Close()
189		return nil, fmt.Errorf("failed to ping Postgres: %w", err)
190	}
191
192	// Create Antfly client
193	antflyClient, err := antfly.NewAntflyClient(config.AntflyURL, http.DefaultClient)
194	if err != nil {
195		pgPool.Close()
196		return nil, fmt.Errorf("failed to create Antfly client: %w", err)
197	}
198
199	// Create table if requested
200	if config.CreateTable {
201		err := antflyClient.CreateTable(ctx, config.AntflyTable, antfly.CreateTableRequest{
202			NumShards: uint(config.ReplicationFactor),
203		})
204		if err != nil {
205			log.Printf("Warning: Failed to create table (may already exist): %v", err)
206		} else {
207			fmt.Printf("✓ Created Antfly table '%s'\n", config.AntflyTable)
208		}
209	}
210
211	return &PostgresSync{
212		config:       config,
213		pgPool:       pgPool,
214		antflyClient: antflyClient,
215		lastSyncTime: time.Now(),
216	}, nil
217}
218
219// Close cleans up resources
220func (ps *PostgresSync) Close() {
221	if ps.pgPool != nil {
222		ps.pgPool.Close()
223	}
224}
225
226// ANCHOR: full_sync
227// FullSync performs a complete sync of all data from Postgres to Antfly
228func (ps *PostgresSync) FullSync(ctx context.Context) error {
229	startTime := time.Now()
230
231	// Query all records from Postgres
232	query := fmt.Sprintf(`
233		SELECT %s, %s
234		FROM %s
235		ORDER BY %s
236	`, ps.config.IDColumn, ps.config.DataColumn, ps.config.TableName, ps.config.IDColumn)
237
238	rows, err := ps.pgPool.Query(ctx, query)
239	if err != nil {
240		return fmt.Errorf("failed to query Postgres: %w", err)
241	}
242	defer rows.Close()
243
244	// Collect records in batches
245	var records []struct {
246		ID   string
247		Data map[string]any
248	}
249
250	for rows.Next() {
251		var id string
252		var data []byte
253
254		if err := rows.Scan(&id, &data); err != nil {
255			return fmt.Errorf("failed to scan row: %w", err)
256		}
257
258		var jsonData map[string]any
259		if err := json.Unmarshal(data, &jsonData); err != nil {
260			log.Printf("Warning: Failed to parse JSON for ID %s: %v", id, err)
261			continue
262		}
263
264		records = append(records, struct {
265			ID   string
266			Data map[string]any
267		}{ID: id, Data: jsonData})
268	}
269
270	if err := rows.Err(); err != nil {
271		return fmt.Errorf("error iterating rows: %w", err)
272	}
273
274	fmt.Printf("Full sync: Found %d records in Postgres\n", len(records))
275
276	if len(records) == 0 {
277		fmt.Println("No records to sync")
278		ps.stats.mu.Lock()
279		ps.stats.LastFullSync = time.Now()
280		ps.stats.mu.Unlock()
281		return nil
282	}
283
284	// Sync in batches using Linear Merge
285	totalUpserted := 0
286	totalSkipped := 0
287	totalDeleted := 0
288	cursor := ""
289
290	for i := 0; i < len(records); i += ps.config.BatchSize {
291		end := min(i+ps.config.BatchSize, len(records))
292		batch := records[i:end]
293
294		// Convert to Antfly records map
295		antflyRecords := make(map[string]any)
296		for _, rec := range batch {
297			// Add metadata
298			doc := make(map[string]any)
299			doc["id"] = rec.ID
300			doc["data"] = rec.Data
301			doc["source"] = "postgres"
302			doc["synced_at"] = time.Now().Format(time.RFC3339)
303
304			antflyRecords[rec.ID] = doc
305		}
306
307		// Perform linear merge
308		result, err := ps.antflyClient.LinearMerge(
309			ctx,
310			ps.config.AntflyTable,
311			antfly.LinearMergeRequest{
312				Records:      antflyRecords,
313				LastMergedId: cursor,
314				DryRun:       false,
315			},
316		)
317		if err != nil {
318			ps.stats.mu.Lock()
319			ps.stats.TotalErrors++
320			ps.stats.mu.Unlock()
321			return fmt.Errorf("linear merge failed: %w", err)
322		}
323
324		totalUpserted += result.Upserted
325		totalSkipped += result.Skipped
326		totalDeleted += result.Deleted
327
328		// Update cursor for next batch
329		if result.NextCursor != "" {
330			cursor = result.NextCursor
331		} else {
332			// Find max ID in batch
333			maxID := ""
334			for id := range antflyRecords {
335				if id > maxID {
336					maxID = id
337				}
338			}
339			cursor = maxID
340		}
341
342		fmt.Printf("  Batch %d-%d: %d upserted, %d skipped, %d deleted\n",
343			i+1, end, result.Upserted, result.Skipped, result.Deleted)
344	}
345
346	duration := time.Since(startTime)
347
348	fmt.Printf("✓ Full sync complete: %d upserted, %d skipped, %d deleted in %v\n",
349		totalUpserted, totalSkipped, totalDeleted, duration)
350
351	// Update stats
352	ps.stats.mu.Lock()
353	ps.stats.TotalSynced += int64(totalUpserted)
354	ps.stats.TotalSkipped += int64(totalSkipped)
355	ps.stats.TotalDeleted += int64(totalDeleted)
356	ps.stats.LastFullSync = time.Now()
357	ps.stats.mu.Unlock()
358
359	return nil
360}
361
362// ANCHOR_END: full_sync
363
364// ANCHOR: realtime_sync
365// StartRealtimeSync starts listening for Postgres notifications
366func (ps *PostgresSync) StartRealtimeSync(ctx context.Context) error {
367	// Use a dedicated connection for LISTEN
368	conn, err := pgx.Connect(ctx, ps.config.PostgresURL)
369	if err != nil {
370		return fmt.Errorf("failed to create listen connection: %w", err)
371	}
372	defer func() {
373		if err := conn.Close(ctx); err != nil {
374			log.Printf("Warning: Failed to close connection: %v", err)
375		}
376	}()
377
378	// Start listening for notifications
379	channelName := ps.config.TableName + "_changes"
380	_, err = conn.Exec(ctx, "LISTEN "+pgx.Identifier{channelName}.Sanitize())
381	if err != nil {
382		return fmt.Errorf("failed to LISTEN: %w", err)
383	}
384
385	fmt.Printf("✓ Listening on channel '%s'\n", channelName)
386
387	// Buffer for batching rapid changes
388	changeBatch := make(map[string]ChangeEvent)
389	var batchMu sync.Mutex
390	batchTicker := time.NewTicker(1 * time.Second)
391	defer batchTicker.Stop()
392
393	// Process batched changes
394	processBatch := func() {
395		batchMu.Lock()
396		if len(changeBatch) == 0 {
397			batchMu.Unlock()
398			return
399		}
400
401		// Copy and clear batch
402		toProcess := make(map[string]ChangeEvent, len(changeBatch))
403		maps.Copy(toProcess, changeBatch)
404		changeBatch = make(map[string]ChangeEvent)
405		batchMu.Unlock()
406
407		// Process the batch
408		if err := ps.processBatchedChanges(ctx, toProcess); err != nil {
409			log.Printf("Error processing batch: %v", err)
410			ps.stats.mu.Lock()
411			ps.stats.TotalErrors++
412			ps.stats.mu.Unlock()
413		}
414	}
415
416	// Goroutine to process batches periodically
417	go func() {
418		for {
419			select {
420			case <-batchTicker.C:
421				processBatch()
422			case <-ctx.Done():
423				return
424			}
425		}
426	}()
427
428	// Listen for notifications
429	for {
430		notification, err := conn.WaitForNotification(ctx)
431		if err != nil {
432			if ctx.Err() != nil {
433				return nil // Context cancelled, clean exit
434			}
435			return fmt.Errorf("notification error: %w", err)
436		}
437
438		// Parse the notification payload
439		var event ChangeEvent
440		if err := json.Unmarshal([]byte(notification.Payload), &event); err != nil {
441			log.Printf("Warning: Failed to parse notification: %v", err)
442			continue
443		}
444
445		// Add to batch
446		batchMu.Lock()
447		changeBatch[event.ID] = event
448		batchMu.Unlock()
449
450		fmt.Printf("← Change detected: %s %s\n", event.Operation, event.ID)
451	}
452}
453
454// ANCHOR_END: realtime_sync
455
456// ANCHOR: process_batched_changes
457// processBatchedChanges syncs a batch of changes to Antfly
458func (ps *PostgresSync) processBatchedChanges(
459	ctx context.Context,
460	changes map[string]ChangeEvent,
461) error {
462	if len(changes) == 0 {
463		return nil
464	}
465
466	// Separate into upserts and deletes
467	var deletes []string
468	records := make(map[string]any)
469
470	for id, event := range changes {
471		if event.Operation == "DELETE" {
472			deletes = append(deletes, id)
473		} else {
474			// For INSERT/UPDATE, fetch current data from Postgres
475			// (the notification might not contain the full data)
476			query := fmt.Sprintf(`SELECT %s FROM %s WHERE %s = $1`,
477				ps.config.DataColumn, ps.config.TableName, ps.config.IDColumn)
478
479			var data []byte
480			err := ps.pgPool.QueryRow(ctx, query, id).Scan(&data)
481			if err != nil {
482				if err == pgx.ErrNoRows {
483					// Record was deleted between notification and now
484					deletes = append(deletes, id)
485					continue
486				}
487				log.Printf("Warning: Failed to fetch data for %s: %v", id, err)
488				continue
489			}
490
491			var jsonData map[string]any
492			if err := json.Unmarshal(data, &jsonData); err != nil {
493				log.Printf("Warning: Failed to parse JSON for %s: %v", id, err)
494				continue
495			}
496
497			doc := make(map[string]any)
498			doc["id"] = id
499			doc["data"] = jsonData
500			doc["source"] = "postgres"
501			doc["synced_at"] = time.Now().Format(time.RFC3339)
502
503			records[id] = doc
504		}
505	}
506
507	// Handle upserts via Linear Merge
508	if len(records) > 0 {
509		// Sort IDs to determine range
510		ids := make([]string, 0, len(records))
511		for id := range records {
512			ids = append(ids, id)
513		}
514		slices.Sort(ids)
515
516		result, err := ps.antflyClient.LinearMerge(
517			ctx,
518			ps.config.AntflyTable,
519			antfly.LinearMergeRequest{
520				Records:      records,
521				LastMergedId: "",
522				DryRun:       false,
523			},
524		)
525		if err != nil {
526			return fmt.Errorf("linear merge failed: %w", err)
527		}
528
529		fmt.Printf("→ Real-time sync: %d upserted, %d skipped\n",
530			result.Upserted, result.Skipped)
531
532		ps.stats.mu.Lock()
533		ps.stats.TotalSynced += int64(result.Upserted)
534		ps.stats.TotalSkipped += int64(result.Skipped)
535		ps.stats.RealtimeUpdates += int64(len(records))
536		ps.stats.LastRealtimeSync = time.Now()
537		ps.stats.mu.Unlock()
538	}
539
540	// Handle deletes
541	if len(deletes) > 0 {
542		// Use Batch API to delete multiple documents at once
543		batchResult, err := ps.antflyClient.Batch(ctx, ps.config.AntflyTable, antfly.BatchRequest{
544			Deletes: deletes,
545		})
546		if err != nil {
547			return fmt.Errorf("batch delete failed: %w", err)
548		}
549
550		deletedCount := batchResult.Deleted
551		if len(batchResult.Failed) > 0 {
552			log.Printf("Warning: %d delete operations failed", len(batchResult.Failed))
553			for _, failed := range batchResult.Failed {
554				log.Printf("  Failed to delete %s: %s", failed.Id, failed.Error)
555			}
556		}
557
558		fmt.Printf("→ Real-time sync: %d deleted\n", deletedCount)
559
560		ps.stats.mu.Lock()
561		ps.stats.TotalDeleted += int64(deletedCount)
562		ps.stats.RealtimeUpdates += int64(deletedCount)
563		ps.stats.LastRealtimeSync = time.Now()
564		ps.stats.mu.Unlock()
565	}
566
567	return nil
568}
569
570// ANCHOR_END: process_batched_changes
571
572// PrintStats prints current sync statistics
573func (ps *PostgresSync) PrintStats() {
574	ps.stats.mu.RLock()
575	defer ps.stats.mu.RUnlock()
576
577	fmt.Printf("\n--- Sync Statistics ---\n")
578	fmt.Printf("Total synced: %d\n", ps.stats.TotalSynced)
579	fmt.Printf("Total skipped: %d\n", ps.stats.TotalSkipped)
580	fmt.Printf("Total deleted: %d\n", ps.stats.TotalDeleted)
581	fmt.Printf("Real-time updates: %d\n", ps.stats.RealtimeUpdates)
582	fmt.Printf("Errors: %d\n", ps.stats.TotalErrors)
583	if !ps.stats.LastFullSync.IsZero() {
584		fmt.Printf("Last full sync: %v ago\n", time.Since(ps.stats.LastFullSync).Round(time.Second))
585	}
586	if !ps.stats.LastRealtimeSync.IsZero() {
587		fmt.Printf(
588			"Last real-time sync: %v ago\n",
589			time.Since(ps.stats.LastRealtimeSync).Round(time.Second),
590		)
591	}
592	fmt.Printf("----------------------\n\n")
593}
594
595func parseFlags() Config {
596	config := Config{}
597
598	flag.StringVar(
599		&config.PostgresURL,
600		"postgres",
601		os.Getenv("POSTGRES_URL"),
602		"Postgres connection URL",
603	)
604	flag.StringVar(&config.AntflyURL, "antfly", "http://localhost:8080/api/v1", "Antfly API URL")
605	flag.StringVar(&config.TableName, "pg-table", "documents", "Postgres table name")
606	flag.StringVar(&config.IDColumn, "id-column", "id", "ID column name")
607	flag.StringVar(&config.DataColumn, "data-column", "data", "JSONB data column name")
608	flag.StringVar(&config.AntflyTable, "antfly-table", "postgres_docs", "Antfly table name")
609	flag.DurationVar(
610		&config.FullSyncInterval,
611		"full-sync-interval",
612		5*time.Minute,
613		"Full sync interval (0 to disable)",
614	)
615	flag.IntVar(&config.BatchSize, "batch-size", 1000, "Batch size for sync")
616	flag.BoolVar(
617		&config.CreateTable,
618		"create-table",
619		false,
620		"Create Antfly table if it doesn't exist",
621	)
622	flag.IntVar(&config.ReplicationFactor, "num-shards", 3, "Number of shards for new table")
623
624	flag.Parse()
625
626	if config.PostgresURL == "" {
627		log.Fatal("Error: --postgres or POSTGRES_URL environment variable is required")
628	}
629
630	return config
631}
632
633func maskPassword(url string) string {
634	// Simple password masking for display
635	if idx := len(url); idx > 50 {
636		return url[:30] + "..." + url[idx-10:]
637	}
638	return url
639}
640