docsaf is a tool that syncs documentation files (Markdown, MDX, and OpenAPI specs) to Antfly using the Linear Merge API, with automatic change detection and type separation.

What is Linear Merge?#

The Linear Merge API is a stateless, progressive synchronization API that:

  • Efficiently syncs external data sources to Antfly tables
  • Detects changes using content hashing (skips unchanged documents)
  • Auto-deletes documents removed from the source
  • Handles shard boundaries with cursor-based pagination
  • Supports dry-run to preview changes before applying

Perfect for syncing documentation, configuration files, or any external data source.

Features#

  1. Multi-format support: Processes Markdown (.md), MDX (.mdx), and OpenAPI (.yaml, .yml, .json) files
  2. Smart chunking: Uses goldmark to parse Markdown/MDX into sections by headings
  3. Frontmatter parsing: Extracts YAML frontmatter (title, description, etc.) from MDX files
  4. Wildcard filtering: Include/exclude files using glob patterns with ** support
  5. OpenAPI parsing: Uses libopenapi to extract paths, schemas, and info from OpenAPI specs
  6. Type separation: Different document types are stored with distinct _type values for targeted querying
  7. Content hashing: Automatically skips unchanged sections on re-sync
  8. Incremental updates: Only updates modified sections
  9. Deletion detection: Removes sections that no longer exist in source
  10. Dry run mode: Preview what will change before committing

Document Types#

docsaf creates documents with the following _type values:

TypeDescriptionSource
markdown_sectionSections from .md filesChunked by headings using goldmark
mdx_sectionSections from .mdx filesChunked by headings using goldmark
openapi_infoAPI informationInfo object from OpenAPI specs
openapi_pathAPI path operationsGET /users, POST /orders, etc.
openapi_schemaData schemasComponent schemas from OpenAPI specs

Each type has its own schema with specific metadata fields. See schemas.yaml for details.

Prerequisites#

  1. Antfly running locally:

    cd /path/to/antfly
    go run ./cmd/antfly swarm
  2. Build docsaf (from antfly root):

    go build -o docsaf ./examples/docsaf

Usage#

docsaf has three subcommands:

  1. prepare - Process files and create sorted JSON data
  2. load - Load prepared JSON data into Antfly
  3. sync - Full pipeline (prepare + load in one step)

Subcommand: prepare#

Process documentation files and save to JSON:

./docsaf prepare \
  --dir /path/to/your/docs \
  --output docs.json

Flags:

  • --dir (required) - Path to directory containing documentation files
  • --output - Output JSON file path (default: docs.json)
  • --base-url - Base URL for generating document links (e.g., https://docs.example.com)
  • --include - Include pattern (can be repeated, supports ** wildcards)
  • --exclude - Exclude pattern (can be repeated, supports ** wildcards)

Subcommand: load#

Load prepared JSON data into Antfly:

./docsaf load \
  --input docs.json \
  --table docs \
  --create-table

Dry run to preview changes:

./docsaf load \
  --input docs.json \
  --table docs \
  --dry-run

Flags:

  • --input - Input JSON file path (default: docs.json)
  • --url - Antfly API URL (default: http://localhost:8080/api/v1)
  • --table - Table name to merge into (default: docs)
  • --create-table - Create table if it doesn't exist (default: false)
  • --num-shards - Number of shards for new table (default: 1)
  • --batch-size - Linear merge batch size (default: 10)
  • --embedding-model - Embedding model to use (default: embeddinggemma)
  • --chunker-strategy - Chunker strategy: hugot, semantic, or fixed (default: hugot)
  • --target-tokens - Target tokens for chunking (default: 512)
  • --overlap-tokens - Overlap tokens for chunking (default: 50)
  • --dry-run - Preview changes without applying them (default: false)

Subcommand: sync#

Full pipeline - process files and load directly (original behavior):

./docsaf sync \
  --dir /path/to/your/docs \
  --table docs \
  --create-table

Flags:

  • --dir (required) - Path to directory containing documentation files
  • --url - Antfly API URL (default: http://localhost:8080/api/v1)
  • --table - Table name to merge into (default: docs)
  • --base-url - Base URL for generating document links (e.g., https://docs.example.com)
  • --create-table - Create table if it doesn't exist (default: false)
  • --num-shards - Number of shards for new table (default: 1)
  • --batch-size - Linear merge batch size (default: 10)
  • --embedding-model - Embedding model to use (default: embeddinggemma)
  • --chunker-strategy - Chunker strategy: hugot, semantic, or fixed (default: hugot)
  • --target-tokens - Target tokens for chunking (default: 512)
  • --overlap-tokens - Overlap tokens for chunking (default: 50)
  • --dry-run - Preview changes without applying them (default: false)
  • --include - Include pattern (can be repeated, supports ** wildcards)
  • --exclude - Exclude pattern (can be repeated, supports ** wildcards)

Example Workflows#

1. Two-Step Workflow (Prepare + Load)#

Step 1: Prepare the data

# Process files and create JSON
./docsaf prepare \
  --dir ./my-docs \
  --output my-docs.json

# Output:
# === docsaf prepare - Process Documentation Files ===
# ✓ Found 127 document sections
#
# Document types found:
#   - markdown_section: 45
#   - openapi_path: 52
#   - openapi_schema: 24
#   - openapi_info: 6
#
# ✓ Prepared data written to my-docs.json

Step 2: Load into Antfly

# Load the prepared JSON data
./docsaf load \
  --input my-docs.json \
  --table docs \
  --create-table

# Output:
# === docsaf load - Load Data to Antfly ===
# ✓ Loaded 127 records
# Upserted: 127
# Skipped: 0
# Deleted: 0

Benefits of two-step workflow:

  • Separate data processing from loading
  • Can version control the JSON file
  • Can manually inspect/modify the JSON before loading
  • Can load the same data to multiple tables/clusters

2. One-Step Workflow (Sync)#

# Process and load in one command
./docsaf sync \
  --dir ./my-docs \
  --table docs \
  --create-table

# Output:
# === docsaf sync - Full Pipeline ===
# ✓ Found 127 document sections
# Upserted: 127
# Skipped: 0
# Deleted: 0

3. Re-sync (No Changes)#

# Second run - nothing changed
./docsaf sync --dir ./my-docs --table docs

# Output:
# ✓ Found 127 document sections
# Upserted: 0
# Skipped: 127 (unchanged)
# Deleted: 0

The content hash optimization means unchanged documents are skipped entirely - no expensive writes!

4. Incremental Update#

After editing a file:

./docsaf sync --dir ./my-docs --table docs

# Output:
# ✓ Found 127 document sections
# Upserted: 3 (updated sections)
# Skipped: 124 (unchanged)
# Deleted: 0

Only the modified sections are updated!

5. Sync Antfly's Own Documentation#

# From the antfly repository root
./docsaf sync \
  --dir . \
  --create-table \
  --table antfly_docs

This will process:

  • All markdown files (README.md, work-log/*.md, etc.)
  • MDX documentation
  • OpenAPI specs in src/metadata/api.yaml and src/usermgr/api.yaml

6. Sync www/ Website Documentation (with Wildcards)#

# From the antfly repository root
# Using exclude patterns to skip build artifacts and code
./docsaf sync \
  --dir www \
  --exclude "**/node_modules/**" \
  --exclude "**/.next/**" \
  --exclude "**/out/**" \
  --exclude "**/work-log/**" \
  --exclude "**/scripts/**" \
  --exclude "**/config/**" \
  --exclude "**/components/**" \
  --exclude "**/app/**" \
  --exclude "**/public/**" \
  --table antfly_website_docs \
  --create-table

Or, more simply using include patterns:

# Only index files in the content directory
./docsaf sync \
  --dir www \
  --include "**/content/**" \
  --table antfly_website_docs \
  --create-table

This will:

  • Extract frontmatter (title, description) from MDX files
  • Process 22 MDX documentation files in www/content/docs/
  • Skip auto-generated navigation and build artifacts
  • Create sections with frontmatter metadata

How It Works#

1. File Discovery#

The RepositoryTraverser walks the directory tree and identifies files by extension:

  • .md, .mdxMarkdownProcessor
  • .yaml, .yml, .jsonOpenAPIProcessor (if valid OpenAPI)

2. Markdown/MDX Processing (goldmark)#

Using goldmark's AST parser:

# Main Title           → Section 1 (markdown_section)
Content here...

## Subsection A        → Section 2 (markdown_section)
More content...

## Subsection B        → Section 3 (markdown_section)
Even more content...
  1. Parse the document into an Abstract Syntax Tree
  2. Walk the AST and identify heading nodes
  3. Split content into sections at each heading
  4. Create a DocumentSection for each section with:
    • Unique ID (hash of file path + heading)
    • Title (heading text)
    • Content (section text with markdown formatting)
    • Metadata (heading level, is_mdx flag)
    • Type: markdown_section or mdx_section

3. OpenAPI Processing (libopenapi)#

Using libopenapi to parse OpenAPI v3 specifications:

Input: api.yaml with paths, schemas, info

Output:

  1. Info document (openapi_info): API title, version, description
  2. Path documents (openapi_path): One per operation (GET /users, POST /orders)
  3. Schema documents (openapi_schema): One per component schema (User, Order)

Each document includes rich metadata (HTTP method, tags, operation ID, etc.)

4. Linear Merge Process#

The core sync logic uses batched Linear Merge with cursor-based pagination:

96 lines
1// performBatchedLinearMerge performs LinearMerge in batches with progress logging
2// Returns the final cursor position after processing all batches
3func performBatchedLinearMerge(
4	ctx context.Context,
5	client *antfly.AntflyClient,
6	tableName string,
7	records map[string]any,
8	batchSize int,
9	dryRun bool,
10) (string, error) {
11	// Sort IDs for deterministic pagination (REQUIRED for linear merge!)
12	ids := make([]string, 0, len(records))
13	for id := range records {
14		ids = append(ids, id)
15	}
16	// CRITICAL: Must sort IDs for linear merge cursor logic to work correctly
17	sortedIDs := make([]string, len(ids))
18	copy(sortedIDs, ids)
19	// Use slices.Sort from Go 1.21+
20	// If on older Go, use: sort.Strings(sortedIDs)
21	slices.Sort(sortedIDs)
22
23	totalBatches := (len(sortedIDs) + batchSize - 1) / batchSize
24	fmt.Printf("Loading %d documents in %d batches of %d records\n", len(sortedIDs), totalBatches, batchSize)
25
┌─────────────────────┐
│ Documentation Files │
│ (.md, .mdx, .yaml)  │
└──────────┬──────────┘
           │ 1. Discover & process files
┌─────────────────────┐
│ Document Sections   │ {id_1: doc_1, id_2: doc_2, ...}
│ (with _type)        │
└──────────┬──────────┘
           │ 2. Linear Merge API
┌─────────────────────┐
│ For each range:     │
│  - Scan storage     │
│  - Compare hash     │
│  - Upsert/Skip      │
│  - Delete old       │
└──────────┬──────────┘
           │ 3. Result
┌─────────────────────┐
│ Antfly Table        │ Synced with source!
│ (typed documents)   │
└─────────────────────┘

5. Content Hash Optimization#

Each document's content is hashed:

hash := xxhash(canonical_json(document))

If the hash matches what's already in storage → SKIP (no write needed!)

6. Auto-Deletion#

Documents in the table but not in the merge request are automatically deleted:

Storage:  [doc_1, doc_2, doc_3, doc_4]
Request:  [doc_1, doc_2, doc_5]        ← doc_5 is new

Result:
  - doc_1: SKIP (unchanged)
  - doc_2: SKIP (unchanged)
  - doc_3: DELETE (not in request)
  - doc_4: DELETE (not in request)
  - doc_5: UPSERT (new)

Querying the Data#

Once ingested, you can query by document type:

Search all markdown sections#

curl -X POST http://localhost:8080/api/v1/tables/docs/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": {
      "match": {
        "_type": "markdown_section"
      }
    },
    "full_text_query": "authentication"
  }'

Search OpenAPI paths#

curl -X POST http://localhost:8080/api/v1/tables/docs/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": {
      "match": {
        "_type": "openapi_path"
      }
    },
    "full_text_query": "users"
  }'

Filter by HTTP method#

curl -X POST http://localhost:8080/api/v1/tables/docs/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": {
      "bool": {
        "must": [
          {"match": {"_type": "openapi_path"}},
          {"match": {"metadata.http_method": "get"}}
        ]
      }
    }
  }'

Get OpenAPI schemas#

curl -X POST http://localhost:8080/api/v1/tables/docs/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": {
      "match": {
        "_type": "openapi_schema"
      }
    }
  }'

Document Schemas#

The schemas.yaml file contains example schema definitions for each document type with Antfly's custom annotations:

  • x-antfly-types: Specify field types (text, keyword, embedding, etc.)
  • x-antfly-index: Enable/disable indexing for a field
  • x-antfly-include-in-all: Include fields in the Bleve _all field for full-text search

Example for markdown_section:

markdown_section:
  type: object
  x-antfly-include-in-all:
    - title
    - content
  properties:
    title:
      type: string
      x-antfly-types:
        - text
        - keyword
    content:
      type: string
      x-antfly-types:
        - text
    _type:
      type: string
      x-antfly-types:
        - keyword

Advanced Features#

Hybrid Search with Embeddings#

Add embedding enricher to enable semantic search. Here's how docsaf creates embedding indexes:

41 lines
1// createEmbeddingIndex creates an embedding index configuration with chunking
2func createEmbeddingIndex(embeddingModel, chunkerModel string, targetTokens, overlapTokens int) (*antfly.IndexConfig, error) {
3	embeddingIndexConfig := antfly.IndexConfig{
4		Name: "embeddings",
5		Type: antfly.IndexTypeEmbeddings,
6	}
7
8	// Configure embedder
9	embedder, err := antfly.NewEmbedderConfig(antfly.OllamaEmbedderConfig{
10		Model: embeddingModel,
11	})
12	if err != nil {
13		return nil, fmt.Errorf("failed to configure embedder: %w", err)
14	}
15
16	// Configure chunker via Termite
17	// Model can be "fixed-bert-tokenizer", "fixed-bpe-tokenizer", or any ONNX model directory name
18	chunker := antfly.ChunkerConfig{}
19	err = chunker.FromTermiteChunkerConfig(antfly.TermiteChunkerConfig{
20		Model:         chunkerModel,
21		TargetTokens:  targetTokens,
22		OverlapTokens: overlapTokens,
23	})
24	if err != nil {
25		return nil, fmt.Errorf("failed to configure chunker: %w", err)

You can also manually configure indexes:

// Add indexes to the table
client.UpdateTable(ctx, "docs", antfly.UpdateTableRequest{
    Indexes: []antfly.IndexConfig{
        {
            Type: "full_text_v0",
            Name: "bleve",
        },
        {
            Type: "aknn_v0",
            Name: "embeddings",
            Config: map[string]any{
                "embedding_field": "content_embedding",
            },
        },
        {
            Type: "embeddingenricher",
            Name: "enricher",
            Config: map[string]any{
                "text_fields": []string{"content"},
                "output_field": "content_embedding",
                "model": "text-embedding-3-small",
            },
        },
    },
})

Incremental Updates (Cron)#

Run docsaf periodically to sync changes:

# Cron job: sync docs every hour
0 * * * * cd /path/to/antfly && ./docsaf sync --dir /path/to/docs --table docs

Or use the two-step workflow to separate data processing from loading:

# Cron job: prepare data every hour, load separately
0 * * * * cd /path/to/antfly && ./docsaf prepare --dir /path/to/docs --output /tmp/docs.json
15 * * * * cd /path/to/antfly && ./docsaf load --input /tmp/docs.json --table docs

Only new or modified files will be updated.

Filter by File Type#

Query specific document types:

# Only MDX files
curl ... -d '{"query": {"match": {"_type": "mdx_section"}}}'

# Only OpenAPI paths with tag "users"
curl ... -d '{
  "query": {
    "bool": {
      "must": [
        {"match": {"_type": "openapi_path"}},
        {"match": {"metadata.tags": "users"}}
      ]
    }
  }
}'

Implementation Details#

ID Generation#

Document IDs are generated using SHA-256 hashes:

doc_<hash(file_path + identifier)[:16]>

This ensures:

  • Stable IDs across runs (same file → same ID)
  • Uniqueness within the repository
  • Efficient lookups

Error Handling#

  • Invalid OpenAPI files are logged and skipped (doesn't fail the entire run)
  • File read errors are logged and skipped
  • Parse errors are logged with file path for debugging

Performance Characteristics#

OperationComplexityNotes
Initial importO(n) writesAll sections inserted
Re-sync unchangedO(n) reads, 0 writesHash comparison only
Incremental updateO(n) reads, O(k) writesk = changed sections
Deletion scanO(n) readsIdentifies removed sections

Batch size recommendations:

  • Small directories: 100-500 sections/request
  • Medium directories: 1,000-5,000 sections/request
  • Large directories: 10,000+ sections/request (may span multiple batches)

Troubleshooting#

Table doesn't exist:

# Add --create-table flag
./docsaf --dir . --create-table

Connection refused:

# Make sure Antfly is running
go run ./cmd/antfly swarm

No documents found:

  • Ensure the directory contains .md, .mdx, or valid OpenAPI spec files
  • Check file extensions (case-insensitive)
  • OpenAPI files must be valid v3 specifications

OpenAPI parsing errors:

  • Verify the file is a valid OpenAPI 3.x specification
  • Use a validator like swagger-editor
  • Check for YAML/JSON syntax errors
  • Note: Invalid OpenAPI files are logged and skipped

Real-World Use Cases#

  1. Documentation Sync: Keep Antfly in sync with documentation directories (Markdown, MDX, OpenAPI)
  2. API Documentation: Ingest OpenAPI specs for searchable API reference
  3. Knowledge Base: Import and maintain wiki/docs from external sources
  4. Multi-Format Search: Search across markdown docs and API specs simultaneously
  5. Developer Portal: Build searchable developer documentation from multiple sources

Next Steps#

  • Add embedding generation to enable semantic search across all document types
  • Implement Git integration to track document versions and authors
  • Add support for more file formats (reStructuredText, AsciiDoc, etc.)
  • Create webhooks for automatic re-sync on file changes
  • Build a UI for browsing documents by type

API Reference#

See the full Linear Merge API documentation:

Project Files#

640 lines
1package main
2
3import (
4	"context"
5	"encoding/json"
6	"flag"
7	"fmt"
8	"log"
9	"net/http"
10	"os"
11	"slices"
12	"strings"
13	"time"
14
15	"github.com/antflydb/antfly-go/antfly"
16	"github.com/antflydb/antfly-go/docsaf"
17)
18
19// StringSliceFlag allows repeated flags to build a slice
20type StringSliceFlag []string
21
22func (s *StringSliceFlag) String() string {
23	return strings.Join(*s, ", ")
24}
25
26func (s *StringSliceFlag) Set(value string) error {
27	*s = append(*s, value)
28	return nil
29}
30
31// ANCHOR: prepare_cmd
32func prepareCmd(args []string) error {
33	fs := flag.NewFlagSet("prepare", flag.ExitOnError)
34	dirPath := fs.String("dir", "", "Path to directory containing documentation files (required)")
35	outputFile := fs.String("output", "docs.json", "Output JSON file path")
36	baseURL := fs.String("base-url", "", "Base URL for generating document links (optional)")
37
38	var includePatterns StringSliceFlag
39	var excludePatterns StringSliceFlag
40	fs.Var(&includePatterns, "include", "Include pattern (can be repeated, supports ** wildcards)")
41	fs.Var(&excludePatterns, "exclude", "Exclude pattern (can be repeated, supports ** wildcards)")
42
43	if err := fs.Parse(args); err != nil {
44		return fmt.Errorf("failed to parse flags: %w", err)
45	}
46
47	if *dirPath == "" {
48		return fmt.Errorf("--dir flag is required")
49	}
50
51	// Verify path exists and is a directory
52	fileInfo, err := os.Stat(*dirPath)
53	if err != nil {
54		return fmt.Errorf("failed to access path: %w", err)
55	}
56
57	if !fileInfo.IsDir() {
58		return fmt.Errorf("--dir must be a directory")
59	}
60
61	fmt.Printf("=== docsaf prepare - Process Documentation Files ===\n")
62	fmt.Printf("Directory: %s\n", *dirPath)
63	fmt.Printf("Output: %s\n", *outputFile)
64	if len(includePatterns) > 0 {
65		fmt.Printf("Include patterns: %v\n", includePatterns)
66	}
67	if len(excludePatterns) > 0 {
68		fmt.Printf("Exclude patterns: %v\n", excludePatterns)
69	}
70	fmt.Printf("\n")
71
72	// Create filesystem source and processor using library
73	source := docsaf.NewFilesystemSource(docsaf.FilesystemSourceConfig{
74		BaseDir:         *dirPath,
75		BaseURL:         *baseURL,
76		IncludePatterns: includePatterns,
77		ExcludePatterns: excludePatterns,
78	})
79	processor := docsaf.NewProcessor(source, docsaf.DefaultRegistry())
80
81	// Process all files in the directory
82	fmt.Printf("Processing documentation files (chunking by markdown headings)...\n")
83	sections, err := processor.Process(context.Background())
84	if err != nil {
85		return fmt.Errorf("failed to process directory: %w", err)
86	}
87
88	fmt.Printf("✓ Found %d documents\n\n", len(sections))
89
90	if len(sections) == 0 {
91		return fmt.Errorf("no supported files found in directory")
92	}
93
94	// Count sections by type
95	typeCounts := make(map[string]int)
96	for _, section := range sections {
97		typeCounts[section.Type]++
98	}
99
100	fmt.Printf("Document types found:\n")
101	for docType, count := range typeCounts {
102		fmt.Printf("  - %s: %d\n", docType, count)
103	}
104	fmt.Printf("\n")
105
106	// Show sample of documents
107	fmt.Printf("Sample documents:\n")
108	for i, section := range sections {
109		if i >= 10 {
110			fmt.Printf("  ... and %d more\n", len(sections)-i)
111			break
112		}
113		fmt.Printf("  [%d] %s (%s) - %s\n",
114			i+1, section.Title, section.Type, section.FilePath)
115	}
116	fmt.Printf("\n")
117
118	// Convert sections to records map (sorted by ID for consistent ordering)
119	records := make(map[string]any)
120	for _, section := range sections {
121		records[section.ID] = section.ToDocument()
122	}
123
124	// Write to JSON file
125	fmt.Printf("Writing %d records to %s...\n", len(records), *outputFile)
126	jsonData, err := json.MarshalIndent(records, "", "  ")
127	if err != nil {
128		return fmt.Errorf("failed to marshal JSON: %w", err)
129	}
130
131	err = os.WriteFile(*outputFile, jsonData, 0644)
132	if err != nil {
133		return fmt.Errorf("failed to write file: %w", err)
134	}
135
136	fmt.Printf("✓ Prepared data written to %s\n", *outputFile)
137	return nil
138}
139
140// ANCHOR_END: prepare_cmd
141
142// ANCHOR: load_cmd
143func loadCmd(args []string) error {
144	fs := flag.NewFlagSet("load", flag.ExitOnError)
145	antflyURL := fs.String("url", "http://localhost:8080/api/v1", "Antfly API URL")
146	tableName := fs.String("table", "docs", "Table name to merge into")
147	inputFile := fs.String("input", "docs.json", "Input JSON file path")
148	dryRun := fs.Bool("dry-run", false, "Preview changes without applying them")
149	createTable := fs.Bool("create-table", false, "Create table if it doesn't exist")
150	numShards := fs.Int("num-shards", 1, "Number of shards for new table")
151	batchSize := fs.Int("batch-size", 25, "Linear merge batch size")
152	embeddingModel := fs.String("embedding-model", "embeddinggemma", "Embedding model to use (e.g., embeddinggemma)")
153	chunkerModel := fs.String("chunker-model", "fixed-bert-tokenizer", "Chunker model: fixed-bert-tokenizer, fixed-bpe-tokenizer, or any ONNX model directory name")
154	targetTokens := fs.Int("target-tokens", 512, "Target tokens for chunking")
155	overlapTokens := fs.Int("overlap-tokens", 50, "Overlap tokens for chunking")
156	if err := fs.Parse(args); err != nil {
157		return fmt.Errorf("failed to parse flags: %w", err)
158	}
159
160	ctx := context.Background()
161
162	// Create Antfly client
163	client, err := antfly.NewAntflyClient(*antflyURL, http.DefaultClient)
164	if err != nil {
165		return fmt.Errorf("failed to create Antfly client: %w", err)
166	}
167
168	fmt.Printf("=== docsaf load - Load Data to Antfly ===\n")
169	fmt.Printf("Antfly URL: %s\n", *antflyURL)
170	fmt.Printf("Table: %s\n", *tableName)
171	fmt.Printf("Input: %s\n", *inputFile)
172	fmt.Printf("Dry run: %v\n\n", *dryRun)
173
174	// Create table if requested
175	if *createTable {
176		fmt.Printf("Creating table '%s' with %d shards...\n", *tableName, *numShards)
177
178		// Create embedding index configuration
179		embeddingIndex, err := createEmbeddingIndex(*embeddingModel, *chunkerModel, *targetTokens, *overlapTokens)
180		if err != nil {
181			return fmt.Errorf("failed to create embedding index config: %w", err)
182		}
183
184		err = client.CreateTable(ctx, *tableName, antfly.CreateTableRequest{
185			NumShards: uint(*numShards),
186			Indexes: map[string]antfly.IndexConfig{
187				"embeddings": *embeddingIndex,
188			},
189		})
190		if err != nil {
191			log.Printf("Warning: Failed to create table (may already exist): %v\n", err)
192		} else {
193			fmt.Printf("✓ Table created with BM25 and embedding indexes (%s with %s chunking)\n\n",
194				*embeddingModel, *chunkerModel)
195		}
196
197		// Wait for shards to be ready
198		if err := waitForShardsReady(ctx, client, *tableName, 30*time.Second); err != nil {
199			return fmt.Errorf("error waiting for shards: %w", err)
200		}
201	}
202
203	// Read JSON file
204	fmt.Printf("Reading records from %s...\n", *inputFile)
205	jsonData, err := os.ReadFile(*inputFile)
206	if err != nil {
207		return fmt.Errorf("failed to read file: %w", err)
208	}
209
210	var records map[string]any
211	err = json.Unmarshal(jsonData, &records)
212	if err != nil {
213		return fmt.Errorf("failed to unmarshal JSON: %w", err)
214	}
215
216	fmt.Printf("✓ Loaded %d records\n\n", len(records))
217
218	// Perform batched linear merge
219	finalCursor, err := performBatchedLinearMerge(ctx, client, *tableName, records, *batchSize, *dryRun)
220	if err != nil {
221		return fmt.Errorf("batched linear merge failed: %w", err)
222	}
223
224	// Final cleanup: delete any remaining documents beyond the last cursor
225	if finalCursor != "" && !*dryRun {
226		fmt.Printf("\nPerforming final cleanup to remove orphaned documents...\n")
227		cleanupResult, err := client.LinearMerge(ctx, *tableName, antfly.LinearMergeRequest{
228			Records:      map[string]any{}, // Empty records
229			LastMergedId: finalCursor,      // Start from last cursor
230			DryRun:       false,
231			SyncLevel:    antfly.SyncLevelAknn,
232		})
233		if err != nil {
234			return fmt.Errorf("final cleanup failed: %w", err)
235		}
236		fmt.Printf("✓ Final cleanup completed in %s\n", cleanupResult.Took)
237		fmt.Printf("  Deleted: %d orphaned documents\n", cleanupResult.Deleted)
238	}
239
240	fmt.Printf("\n✓ Load completed successfully\n")
241	return nil
242}
243
244// ANCHOR_END: load_cmd
245
246// ANCHOR: sync_cmd
247func syncCmd(args []string) error {
248	fs := flag.NewFlagSet("sync", flag.ExitOnError)
249	antflyURL := fs.String("url", "http://localhost:8080/api/v1", "Antfly API URL")
250	tableName := fs.String("table", "docs", "Table name to merge into")
251	dirPath := fs.String("dir", "", "Path to directory containing documentation files (required)")
252	baseURL := fs.String("base-url", "", "Base URL for generating document links (optional)")
253	dryRun := fs.Bool("dry-run", false, "Preview changes without applying them")
254	createTable := fs.Bool("create-table", false, "Create table if it doesn't exist")
255	numShards := fs.Int("num-shards", 1, "Number of shards for new table")
256	batchSize := fs.Int("batch-size", 25, "Linear merge batch size")
257	embeddingModel := fs.String("embedding-model", "embeddinggemma", "Embedding model to use (e.g., embeddinggemma)")
258	chunkerModel := fs.String("chunker-model", "fixed-bert-tokenizer", "Chunker model: fixed-bert-tokenizer, fixed-bpe-tokenizer, or any ONNX model directory name")
259	targetTokens := fs.Int("target-tokens", 512, "Target tokens for chunking")
260	overlapTokens := fs.Int("overlap-tokens", 50, "Overlap tokens for chunking")
261
262	var includePatterns StringSliceFlag
263	var excludePatterns StringSliceFlag
264	fs.Var(&includePatterns, "include", "Include pattern (can be repeated, supports ** wildcards)")
265	fs.Var(&excludePatterns, "exclude", "Exclude pattern (can be repeated, supports ** wildcards)")
266
267	if err := fs.Parse(args); err != nil {
268		return fmt.Errorf("failed to parse flags: %w", err)
269	}
270
271	if *dirPath == "" {
272		return fmt.Errorf("--dir flag is required")
273	}
274
275	ctx := context.Background()
276
277	// Create Antfly client
278	client, err := antfly.NewAntflyClient(*antflyURL, http.DefaultClient)
279	if err != nil {
280		return fmt.Errorf("failed to create Antfly client: %w", err)
281	}
282
283	fmt.Printf("=== docsaf sync - Full Pipeline ===\n")
284	fmt.Printf("Antfly URL: %s\n", *antflyURL)
285	fmt.Printf("Table: %s\n", *tableName)
286	fmt.Printf("Directory: %s\n", *dirPath)
287	fmt.Printf("Dry run: %v\n", *dryRun)
288	if len(includePatterns) > 0 {
289		fmt.Printf("Include patterns: %v\n", includePatterns)
290	}
291	if len(excludePatterns) > 0 {
292		fmt.Printf("Exclude patterns: %v\n", excludePatterns)
293	}
294	fmt.Printf("\n")
295
296	// Create table if requested
297	if *createTable {
298		fmt.Printf("Creating table '%s' with %d shards...\n", *tableName, *numShards)
299
300		// Create embedding index configuration
301		embeddingIndex, err := createEmbeddingIndex(*embeddingModel, *chunkerModel, *targetTokens, *overlapTokens)
302		if err != nil {
303			return fmt.Errorf("failed to create embedding index config: %w", err)
304		}
305
306		err = client.CreateTable(ctx, *tableName, antfly.CreateTableRequest{
307			NumShards: uint(*numShards),
308			Indexes: map[string]antfly.IndexConfig{
309				"embeddings": *embeddingIndex,
310			},
311		})
312		if err != nil {
313			log.Printf("Warning: Failed to create table (may already exist): %v\n", err)
314		} else {
315			fmt.Printf("✓ Table created with BM25 and embedding indexes (%s with %s chunking)\n\n",
316				*embeddingModel, *chunkerModel)
317		}
318
319		// Wait for shards to be ready
320		if err := waitForShardsReady(ctx, client, *tableName, 30*time.Second); err != nil {
321			return fmt.Errorf("error waiting for shards: %w", err)
322		}
323	}
324
325	// Verify path exists and is a directory
326	fileInfo, err := os.Stat(*dirPath)
327	if err != nil {
328		return fmt.Errorf("failed to access path: %w", err)
329	}
330
331	if !fileInfo.IsDir() {
332		return fmt.Errorf("--dir must be a directory")
333	}
334
335	// Create filesystem source and processor using library
336	source := docsaf.NewFilesystemSource(docsaf.FilesystemSourceConfig{
337		BaseDir:         *dirPath,
338		BaseURL:         *baseURL,
339		IncludePatterns: includePatterns,
340		ExcludePatterns: excludePatterns,
341	})
342	processor := docsaf.NewProcessor(source, docsaf.DefaultRegistry())
343
344	// Process all files in the directory
345	fmt.Printf("Processing documentation files (chunking by markdown headings)...\n")
346	sections, err := processor.Process(context.Background())
347	if err != nil {
348		return fmt.Errorf("failed to process directory: %w", err)
349	}
350
351	fmt.Printf("✓ Found %d documents\n\n", len(sections))
352
353	if len(sections) == 0 {
354		return fmt.Errorf("no supported files found in directory")
355	}
356
357	// Count sections by type
358	typeCounts := make(map[string]int)
359	for _, section := range sections {
360		typeCounts[section.Type]++
361	}
362
363	fmt.Printf("Document types found:\n")
364	for docType, count := range typeCounts {
365		fmt.Printf("  - %s: %d\n", docType, count)
366	}
367	fmt.Printf("\n")
368
369	// Show sample of documents
370	fmt.Printf("Sample documents:\n")
371	for i, section := range sections {
372		if i >= 10 {
373			fmt.Printf("  ... and %d more\n", len(sections)-i)
374			break
375		}
376		fmt.Printf("  [%d] %s (%s) - %s\n",
377			i+1, section.Title, section.Type, section.FilePath)
378	}
379	fmt.Printf("\n")
380
381	// Convert sections to records map
382	records := make(map[string]any)
383	for _, section := range sections {
384		records[section.ID] = section.ToDocument()
385	}
386
387	// Perform batched linear merge
388	finalCursor, err := performBatchedLinearMerge(ctx, client, *tableName, records, *batchSize, *dryRun)
389	if err != nil {
390		return fmt.Errorf("batched linear merge failed: %w", err)
391	}
392
393	// Final cleanup: delete any remaining documents beyond the last cursor
394	if finalCursor != "" && !*dryRun {
395		fmt.Printf("\nPerforming final cleanup to remove orphaned documents...\n")
396		cleanupResult, err := client.LinearMerge(ctx, *tableName, antfly.LinearMergeRequest{
397			Records:      map[string]any{}, // Empty records
398			LastMergedId: finalCursor,      // Start from last cursor
399			DryRun:       false,
400			SyncLevel:    antfly.SyncLevelAknn,
401		})
402		if err != nil {
403			return fmt.Errorf("final cleanup failed: %w", err)
404		}
405		fmt.Printf("✓ Final cleanup completed in %s\n", cleanupResult.Took)
406		fmt.Printf("  Deleted: %d orphaned documents\n", cleanupResult.Deleted)
407	}
408
409	fmt.Printf("\n✓ Sync completed successfully\n")
410	return nil
411}
412
413// ANCHOR_END: sync_cmd
414
415// ANCHOR: create_embedding_index
416// createEmbeddingIndex creates an embedding index configuration with chunking
417func createEmbeddingIndex(embeddingModel, chunkerModel string, targetTokens, overlapTokens int) (*antfly.IndexConfig, error) {
418	embeddingIndexConfig := antfly.IndexConfig{
419		Name: "embeddings",
420		Type: antfly.IndexTypeEmbeddings,
421	}
422
423	// Configure embedder
424	embedder, err := antfly.NewEmbedderConfig(antfly.OllamaEmbedderConfig{
425		Model: embeddingModel,
426	})
427	if err != nil {
428		return nil, fmt.Errorf("failed to configure embedder: %w", err)
429	}
430
431	// Configure chunker via Termite
432	// Model can be "fixed-bert-tokenizer", "fixed-bpe-tokenizer", or any ONNX model directory name
433	chunker := antfly.ChunkerConfig{}
434	err = chunker.FromTermiteChunkerConfig(antfly.TermiteChunkerConfig{
435		Model:         chunkerModel,
436		TargetTokens:  targetTokens,
437		OverlapTokens: overlapTokens,
438	})
439	if err != nil {
440		return nil, fmt.Errorf("failed to configure chunker: %w", err)
441	}
442
443	// Configure embedding index with chunking
444	// Note: Dimension is calculated automatically based on the embedding model
445	err = embeddingIndexConfig.FromEmbeddingsIndexConfig(antfly.EmbeddingsIndexConfig{
446		Field:    "content",
447		Embedder: *embedder,
448		Chunker:  chunker,
449	})
450	if err != nil {
451		return nil, fmt.Errorf("failed to configure embedding index: %w", err)
452	}
453
454	return &embeddingIndexConfig, nil
455}
456
457// ANCHOR_END: create_embedding_index
458
459// ANCHOR: batched_linear_merge
460// performBatchedLinearMerge performs LinearMerge in batches with progress logging
461// Returns the final cursor position after processing all batches
462func performBatchedLinearMerge(
463	ctx context.Context,
464	client *antfly.AntflyClient,
465	tableName string,
466	records map[string]any,
467	batchSize int,
468	dryRun bool,
469) (string, error) {
470	// Sort IDs for deterministic pagination (REQUIRED for linear merge!)
471	ids := make([]string, 0, len(records))
472	for id := range records {
473		ids = append(ids, id)
474	}
475	// CRITICAL: Must sort IDs for linear merge cursor logic to work correctly
476	sortedIDs := make([]string, len(ids))
477	copy(sortedIDs, ids)
478	// Use slices.Sort from Go 1.21+
479	// If on older Go, use: sort.Strings(sortedIDs)
480	slices.Sort(sortedIDs)
481
482	totalBatches := (len(sortedIDs) + batchSize - 1) / batchSize
483	fmt.Printf("Loading %d documents in %d batches of %d records\n", len(sortedIDs), totalBatches, batchSize)
484
485	cursor := ""
486	totalUpserted := 0
487	totalSkipped := 0
488	totalDeleted := 0
489
490	for batchNum := range totalBatches {
491		// Calculate batch range
492		start := batchNum * batchSize
493		end := min(start+batchSize, len(sortedIDs))
494
495		// Build batch records map
496		batchIDs := sortedIDs[start:end]
497		batchRecords := make(map[string]any, len(batchIDs))
498		for _, id := range batchIDs {
499			batchRecords[id] = records[id]
500		}
501
502		fmt.Printf("[Batch %d/%d] Merging records %d-%d\n",
503			batchNum+1, totalBatches, start+1, end)
504
505		// Execute LinearMerge with aknn sync level
506		result, err := client.LinearMerge(ctx, tableName, antfly.LinearMergeRequest{
507			Records:      batchRecords,
508			LastMergedId: cursor,
509			DryRun:       dryRun,
510			SyncLevel:    antfly.SyncLevelAknn, // Wait for vector index writes
511		})
512		if err != nil {
513			return "", fmt.Errorf("failed to perform linear merge for batch %d: %w", batchNum+1, err)
514		}
515
516		// Update cursor for next batch
517		if result.NextCursor != "" {
518			cursor = result.NextCursor
519		} else {
520			// Fallback: use last ID in batch
521			cursor = batchIDs[len(batchIDs)-1]
522		}
523
524		// Accumulate totals
525		totalUpserted += result.Upserted
526		totalSkipped += result.Skipped
527		totalDeleted += result.Deleted
528
529		// Log batch progress
530		fmt.Printf("  Completed in %s - Status: %s\n", result.Took, result.Status)
531		fmt.Printf("  Upserted: %d, Skipped: %d, Deleted: %d, Keys scanned: %d\n",
532			result.Upserted, result.Skipped, result.Deleted, result.KeysScanned)
533
534		if len(result.Failed) > 0 {
535			fmt.Printf("  Failed operations: %d\n", len(result.Failed))
536			for i, fail := range result.Failed {
537				if i >= 5 {
538					fmt.Printf("    ... and %d more\n", len(result.Failed)-i)
539					break
540				}
541				fmt.Printf("    [%d] ID=%s, Error=%s\n", i, fail.Id, fail.Error)
542			}
543		}
544	}
545
546	// Log final totals
547	fmt.Printf("\n=== Linear Merge Complete ===\n")
548	fmt.Printf("Total batches: %d\n", totalBatches)
549	fmt.Printf("Total upserted: %d\n", totalUpserted)
550	fmt.Printf("Total skipped: %d\n", totalSkipped)
551	fmt.Printf("Total deleted: %d\n", totalDeleted)
552
553	return cursor, nil
554}
555
556// ANCHOR_END: batched_linear_merge
557
558// waitForShardsReady polls the table status until shards are ready to accept writes
559func waitForShardsReady(ctx context.Context, client *antfly.AntflyClient, tableName string, timeout time.Duration) error {
560	fmt.Printf("Waiting for shards to be ready...\n")
561
562	deadline := time.Now().Add(timeout)
563	ticker := time.NewTicker(500 * time.Millisecond)
564	defer ticker.Stop()
565
566	pollCount := 0
567
568	for {
569		select {
570		case <-ctx.Done():
571			return fmt.Errorf("context cancelled while waiting for shards")
572		case <-ticker.C:
573			pollCount++
574
575			if time.Now().After(deadline) {
576				return fmt.Errorf("timeout waiting for shards after %d polls", pollCount)
577			}
578
579			// Get table status
580			status, err := client.GetTable(ctx, tableName)
581			if err != nil {
582				fmt.Printf("  [Poll %d] Error getting table status: %v\n", pollCount, err)
583				continue
584			}
585
586			// Check if we have at least one shard
587			if len(status.Shards) > 0 {
588				// Wait longer to ensure leader election completes and propagates
589				if pollCount >= 6 {
590					fmt.Printf("✓ Shards ready after %d polls (~%dms)\n\n", pollCount, pollCount*500)
591					return nil
592				}
593				fmt.Printf("  [Poll %d] Found %d shard(s), waiting for leader status to propagate\n", pollCount, len(status.Shards))
594			} else {
595				fmt.Printf("  [Poll %d] No shards found yet\n", pollCount)
596			}
597		}
598	}
599}
600
601func main() {
602	if len(os.Args) < 2 {
603		fmt.Fprintf(os.Stderr, "docsaf - Documentation Sync to Antfly\n\n")
604		fmt.Fprintf(os.Stderr, "Usage:\n")
605		fmt.Fprintf(os.Stderr, "  docsaf prepare [flags]  - Process files and create sorted JSON data\n")
606		fmt.Fprintf(os.Stderr, "  docsaf load [flags]     - Load JSON data into Antfly\n")
607		fmt.Fprintf(os.Stderr, "  docsaf sync [flags]     - Full pipeline (prepare + load)\n")
608		fmt.Fprintf(os.Stderr, "\nCommands:\n")
609		fmt.Fprintf(os.Stderr, "  prepare  Process documentation files and save to JSON\n")
610		fmt.Fprintf(os.Stderr, "  load     Load prepared JSON data into Antfly table\n")
611		fmt.Fprintf(os.Stderr, "  sync     Process files and load directly (original behavior)\n")
612		fmt.Fprintf(os.Stderr, "\nExamples:\n")
613		fmt.Fprintf(os.Stderr, "  # Prepare data\n")
614		fmt.Fprintf(os.Stderr, "  docsaf prepare --dir /path/to/docs --output docs.json\n\n")
615		fmt.Fprintf(os.Stderr, "  # Load prepared data\n")
616		fmt.Fprintf(os.Stderr, "  docsaf load --input docs.json --table docs --create-table\n\n")
617		fmt.Fprintf(os.Stderr, "  # Full pipeline\n")
618		fmt.Fprintf(os.Stderr, "  docsaf sync --dir /path/to/docs --table docs --create-table\n\n")
619		os.Exit(1)
620	}
621
622	var err error
623	switch os.Args[1] {
624	case "prepare":
625		err = prepareCmd(os.Args[2:])
626	case "load":
627		err = loadCmd(os.Args[2:])
628	case "sync":
629		err = syncCmd(os.Args[2:])
630	default:
631		fmt.Fprintf(os.Stderr, "Unknown command: %s\n", os.Args[1])
632		fmt.Fprintf(os.Stderr, "Valid commands: prepare, load, sync\n")
633		os.Exit(1)
634	}
635
636	if err != nil {
637		log.Fatalf("Error: %v", err)
638	}
639}
640