docsaf is a tool that syncs documentation files (Markdown, MDX, and OpenAPI specs) to Antfly using the Linear Merge API, with automatic change detection and type separation.
What is Linear Merge?
The Linear Merge API is a stateless, progressive synchronization API that:
- Efficiently syncs external data sources to Antfly tables
- Detects changes using content hashing (skips unchanged documents)
- Auto-deletes documents removed from the source
- Handles shard boundaries with cursor-based pagination
- Supports dry-run to preview changes before applying
Perfect for syncing documentation, configuration files, or any external data source.
Features
- Multi-format support: Processes Markdown (.md), MDX (.mdx), and OpenAPI (.yaml, .yml, .json) files
- Smart chunking: Uses goldmark to parse Markdown/MDX into sections by headings
- Frontmatter parsing: Extracts YAML frontmatter (title, description, etc.) from MDX files
- Wildcard filtering: Include/exclude files using glob patterns with
**support - OpenAPI parsing: Uses libopenapi to extract paths, schemas, and info from OpenAPI specs
- Type separation: Different document types are stored with distinct
_typevalues for targeted querying - Content hashing: Automatically skips unchanged sections on re-sync
- Incremental updates: Only updates modified sections
- Deletion detection: Removes sections that no longer exist in source
- Dry run mode: Preview what will change before committing
Document Types
docsaf creates documents with the following _type values:
| Type | Description | Source |
|---|---|---|
markdown_section | Sections from .md files | Chunked by headings using goldmark |
mdx_section | Sections from .mdx files | Chunked by headings using goldmark |
openapi_info | API information | Info object from OpenAPI specs |
openapi_path | API path operations | GET /users, POST /orders, etc. |
openapi_schema | Data schemas | Component schemas from OpenAPI specs |
Each type has its own schema with specific metadata fields. See schemas.yaml for details.
Prerequisites
-
Antfly running locally:
cd /path/to/antfly go run ./cmd/antfly swarm -
Build docsaf (from antfly root):
go build -o docsaf ./examples/docsaf
Usage
docsaf has three subcommands:
prepare- Process files and create sorted JSON dataload- Load prepared JSON data into Antflysync- Full pipeline (prepare + load in one step)
Subcommand: prepare
Process documentation files and save to JSON:
./docsaf prepare \
--dir /path/to/your/docs \
--output docs.jsonFlags:
--dir(required) - Path to directory containing documentation files--output- Output JSON file path (default:docs.json)--base-url- Base URL for generating document links (e.g.,https://docs.example.com)--include- Include pattern (can be repeated, supports**wildcards)--exclude- Exclude pattern (can be repeated, supports**wildcards)
Subcommand: load
Load prepared JSON data into Antfly:
./docsaf load \
--input docs.json \
--table docs \
--create-tableDry run to preview changes:
./docsaf load \
--input docs.json \
--table docs \
--dry-runFlags:
--input- Input JSON file path (default:docs.json)--url- Antfly API URL (default:http://localhost:8080/api/v1)--table- Table name to merge into (default:docs)--create-table- Create table if it doesn't exist (default:false)--num-shards- Number of shards for new table (default:1)--batch-size- Linear merge batch size (default:10)--embedding-model- Embedding model to use (default:embeddinggemma)--chunker-strategy- Chunker strategy:hugot,semantic, orfixed(default:hugot)--target-tokens- Target tokens for chunking (default:512)--overlap-tokens- Overlap tokens for chunking (default:50)--dry-run- Preview changes without applying them (default:false)
Subcommand: sync
Full pipeline - process files and load directly (original behavior):
./docsaf sync \
--dir /path/to/your/docs \
--table docs \
--create-tableFlags:
--dir(required) - Path to directory containing documentation files--url- Antfly API URL (default:http://localhost:8080/api/v1)--table- Table name to merge into (default:docs)--base-url- Base URL for generating document links (e.g.,https://docs.example.com)--create-table- Create table if it doesn't exist (default:false)--num-shards- Number of shards for new table (default:1)--batch-size- Linear merge batch size (default:10)--embedding-model- Embedding model to use (default:embeddinggemma)--chunker-strategy- Chunker strategy:hugot,semantic, orfixed(default:hugot)--target-tokens- Target tokens for chunking (default:512)--overlap-tokens- Overlap tokens for chunking (default:50)--dry-run- Preview changes without applying them (default:false)--include- Include pattern (can be repeated, supports**wildcards)--exclude- Exclude pattern (can be repeated, supports**wildcards)
Example Workflows
1. Two-Step Workflow (Prepare + Load)
Step 1: Prepare the data
# Process files and create JSON
./docsaf prepare \
--dir ./my-docs \
--output my-docs.json
# Output:
# === docsaf prepare - Process Documentation Files ===
# ✓ Found 127 document sections
#
# Document types found:
# - markdown_section: 45
# - openapi_path: 52
# - openapi_schema: 24
# - openapi_info: 6
#
# ✓ Prepared data written to my-docs.jsonStep 2: Load into Antfly
# Load the prepared JSON data
./docsaf load \
--input my-docs.json \
--table docs \
--create-table
# Output:
# === docsaf load - Load Data to Antfly ===
# ✓ Loaded 127 records
# Upserted: 127
# Skipped: 0
# Deleted: 0Benefits of two-step workflow:
- Separate data processing from loading
- Can version control the JSON file
- Can manually inspect/modify the JSON before loading
- Can load the same data to multiple tables/clusters
2. One-Step Workflow (Sync)
# Process and load in one command
./docsaf sync \
--dir ./my-docs \
--table docs \
--create-table
# Output:
# === docsaf sync - Full Pipeline ===
# ✓ Found 127 document sections
# Upserted: 127
# Skipped: 0
# Deleted: 03. Re-sync (No Changes)
# Second run - nothing changed
./docsaf sync --dir ./my-docs --table docs
# Output:
# ✓ Found 127 document sections
# Upserted: 0
# Skipped: 127 (unchanged)
# Deleted: 0The content hash optimization means unchanged documents are skipped entirely - no expensive writes!
4. Incremental Update
After editing a file:
./docsaf sync --dir ./my-docs --table docs
# Output:
# ✓ Found 127 document sections
# Upserted: 3 (updated sections)
# Skipped: 124 (unchanged)
# Deleted: 0Only the modified sections are updated!
5. Sync Antfly's Own Documentation
# From the antfly repository root
./docsaf sync \
--dir . \
--create-table \
--table antfly_docsThis will process:
- All markdown files (README.md, work-log/*.md, etc.)
- MDX documentation
- OpenAPI specs in
src/metadata/api.yamlandsrc/usermgr/api.yaml
6. Sync www/ Website Documentation (with Wildcards)
# From the antfly repository root
# Using exclude patterns to skip build artifacts and code
./docsaf sync \
--dir www \
--exclude "**/node_modules/**" \
--exclude "**/.next/**" \
--exclude "**/out/**" \
--exclude "**/work-log/**" \
--exclude "**/scripts/**" \
--exclude "**/config/**" \
--exclude "**/components/**" \
--exclude "**/app/**" \
--exclude "**/public/**" \
--table antfly_website_docs \
--create-tableOr, more simply using include patterns:
# Only index files in the content directory
./docsaf sync \
--dir www \
--include "**/content/**" \
--table antfly_website_docs \
--create-tableThis will:
- Extract frontmatter (title, description) from MDX files
- Process 22 MDX documentation files in
www/content/docs/ - Skip auto-generated navigation and build artifacts
- Create sections with frontmatter metadata
How It Works
1. File Discovery
The RepositoryTraverser walks the directory tree and identifies files by extension:
.md,.mdx→MarkdownProcessor.yaml,.yml,.json→OpenAPIProcessor(if valid OpenAPI)
2. Markdown/MDX Processing (goldmark)
Using goldmark's AST parser:
# Main Title → Section 1 (markdown_section)
Content here...
## Subsection A → Section 2 (markdown_section)
More content...
## Subsection B → Section 3 (markdown_section)
Even more content...- Parse the document into an Abstract Syntax Tree
- Walk the AST and identify heading nodes
- Split content into sections at each heading
- Create a
DocumentSectionfor each section with:- Unique ID (hash of file path + heading)
- Title (heading text)
- Content (section text with markdown formatting)
- Metadata (heading level, is_mdx flag)
- Type:
markdown_sectionormdx_section
3. OpenAPI Processing (libopenapi)
Using libopenapi to parse OpenAPI v3 specifications:
Input: api.yaml with paths, schemas, info
Output:
- Info document (
openapi_info): API title, version, description - Path documents (
openapi_path): One per operation (GET /users, POST /orders) - Schema documents (
openapi_schema): One per component schema (User, Order)
Each document includes rich metadata (HTTP method, tags, operation ID, etc.)
4. Linear Merge Process
The core sync logic uses batched Linear Merge with cursor-based pagination:
1// performBatchedLinearMerge performs LinearMerge in batches with progress logging
2// Returns the final cursor position after processing all batches
3func performBatchedLinearMerge(
4 ctx context.Context,
5 client *antfly.AntflyClient,
6 tableName string,
7 records map[string]any,
8 batchSize int,
9 dryRun bool,
10) (string, error) {
11 // Sort IDs for deterministic pagination (REQUIRED for linear merge!)
12 ids := make([]string, 0, len(records))
13 for id := range records {
14 ids = append(ids, id)
15 }
16 // CRITICAL: Must sort IDs for linear merge cursor logic to work correctly
17 sortedIDs := make([]string, len(ids))
18 copy(sortedIDs, ids)
19 // Use slices.Sort from Go 1.21+
20 // If on older Go, use: sort.Strings(sortedIDs)
21 slices.Sort(sortedIDs)
22
23 totalBatches := (len(sortedIDs) + batchSize - 1) / batchSize
24 fmt.Printf("Loading %d documents in %d batches of %d records\n", len(sortedIDs), totalBatches, batchSize)
25┌─────────────────────┐
│ Documentation Files │
│ (.md, .mdx, .yaml) │
└──────────┬──────────┘
│ 1. Discover & process files
▼
┌─────────────────────┐
│ Document Sections │ {id_1: doc_1, id_2: doc_2, ...}
│ (with _type) │
└──────────┬──────────┘
│ 2. Linear Merge API
▼
┌─────────────────────┐
│ For each range: │
│ - Scan storage │
│ - Compare hash │
│ - Upsert/Skip │
│ - Delete old │
└──────────┬──────────┘
│ 3. Result
▼
┌─────────────────────┐
│ Antfly Table │ Synced with source!
│ (typed documents) │
└─────────────────────┘5. Content Hash Optimization
Each document's content is hashed:
hash := xxhash(canonical_json(document))If the hash matches what's already in storage → SKIP (no write needed!)
6. Auto-Deletion
Documents in the table but not in the merge request are automatically deleted:
Storage: [doc_1, doc_2, doc_3, doc_4]
Request: [doc_1, doc_2, doc_5] ← doc_5 is new
Result:
- doc_1: SKIP (unchanged)
- doc_2: SKIP (unchanged)
- doc_3: DELETE (not in request)
- doc_4: DELETE (not in request)
- doc_5: UPSERT (new)Querying the Data
Once ingested, you can query by document type:
Search all markdown sections
curl -X POST http://localhost:8080/api/v1/tables/docs/query \
-H "Content-Type: application/json" \
-d '{
"query": {
"match": {
"_type": "markdown_section"
}
},
"full_text_query": "authentication"
}'Search OpenAPI paths
curl -X POST http://localhost:8080/api/v1/tables/docs/query \
-H "Content-Type: application/json" \
-d '{
"query": {
"match": {
"_type": "openapi_path"
}
},
"full_text_query": "users"
}'Filter by HTTP method
curl -X POST http://localhost:8080/api/v1/tables/docs/query \
-H "Content-Type: application/json" \
-d '{
"query": {
"bool": {
"must": [
{"match": {"_type": "openapi_path"}},
{"match": {"metadata.http_method": "get"}}
]
}
}
}'Get OpenAPI schemas
curl -X POST http://localhost:8080/api/v1/tables/docs/query \
-H "Content-Type: application/json" \
-d '{
"query": {
"match": {
"_type": "openapi_schema"
}
}
}'Document Schemas
The schemas.yaml file contains example schema definitions for each document type with Antfly's custom annotations:
x-antfly-types: Specify field types (text,keyword,embedding, etc.)x-antfly-index: Enable/disable indexing for a fieldx-antfly-include-in-all: Include fields in the Bleve_allfield for full-text search
Example for markdown_section:
markdown_section:
type: object
x-antfly-include-in-all:
- title
- content
properties:
title:
type: string
x-antfly-types:
- text
- keyword
content:
type: string
x-antfly-types:
- text
_type:
type: string
x-antfly-types:
- keywordAdvanced Features
Hybrid Search with Embeddings
Add embedding enricher to enable semantic search. Here's how docsaf creates embedding indexes:
1// createEmbeddingIndex creates an embedding index configuration with chunking
2func createEmbeddingIndex(embeddingModel, chunkerModel string, targetTokens, overlapTokens int) (*antfly.IndexConfig, error) {
3 embeddingIndexConfig := antfly.IndexConfig{
4 Name: "embeddings",
5 Type: antfly.IndexTypeEmbeddings,
6 }
7
8 // Configure embedder
9 embedder, err := antfly.NewEmbedderConfig(antfly.OllamaEmbedderConfig{
10 Model: embeddingModel,
11 })
12 if err != nil {
13 return nil, fmt.Errorf("failed to configure embedder: %w", err)
14 }
15
16 // Configure chunker via Termite
17 // Model can be "fixed-bert-tokenizer", "fixed-bpe-tokenizer", or any ONNX model directory name
18 chunker := antfly.ChunkerConfig{}
19 err = chunker.FromTermiteChunkerConfig(antfly.TermiteChunkerConfig{
20 Model: chunkerModel,
21 TargetTokens: targetTokens,
22 OverlapTokens: overlapTokens,
23 })
24 if err != nil {
25 return nil, fmt.Errorf("failed to configure chunker: %w", err)You can also manually configure indexes:
// Add indexes to the table
client.UpdateTable(ctx, "docs", antfly.UpdateTableRequest{
Indexes: []antfly.IndexConfig{
{
Type: "full_text_v0",
Name: "bleve",
},
{
Type: "aknn_v0",
Name: "embeddings",
Config: map[string]any{
"embedding_field": "content_embedding",
},
},
{
Type: "embeddingenricher",
Name: "enricher",
Config: map[string]any{
"text_fields": []string{"content"},
"output_field": "content_embedding",
"model": "text-embedding-3-small",
},
},
},
})Incremental Updates (Cron)
Run docsaf periodically to sync changes:
# Cron job: sync docs every hour
0 * * * * cd /path/to/antfly && ./docsaf sync --dir /path/to/docs --table docsOr use the two-step workflow to separate data processing from loading:
# Cron job: prepare data every hour, load separately
0 * * * * cd /path/to/antfly && ./docsaf prepare --dir /path/to/docs --output /tmp/docs.json
15 * * * * cd /path/to/antfly && ./docsaf load --input /tmp/docs.json --table docsOnly new or modified files will be updated.
Filter by File Type
Query specific document types:
# Only MDX files
curl ... -d '{"query": {"match": {"_type": "mdx_section"}}}'
# Only OpenAPI paths with tag "users"
curl ... -d '{
"query": {
"bool": {
"must": [
{"match": {"_type": "openapi_path"}},
{"match": {"metadata.tags": "users"}}
]
}
}
}'Implementation Details
ID Generation
Document IDs are generated using SHA-256 hashes:
doc_<hash(file_path + identifier)[:16]>This ensures:
- Stable IDs across runs (same file → same ID)
- Uniqueness within the repository
- Efficient lookups
Error Handling
- Invalid OpenAPI files are logged and skipped (doesn't fail the entire run)
- File read errors are logged and skipped
- Parse errors are logged with file path for debugging
Performance Characteristics
| Operation | Complexity | Notes |
|---|---|---|
| Initial import | O(n) writes | All sections inserted |
| Re-sync unchanged | O(n) reads, 0 writes | Hash comparison only |
| Incremental update | O(n) reads, O(k) writes | k = changed sections |
| Deletion scan | O(n) reads | Identifies removed sections |
Batch size recommendations:
- Small directories: 100-500 sections/request
- Medium directories: 1,000-5,000 sections/request
- Large directories: 10,000+ sections/request (may span multiple batches)
Troubleshooting
Table doesn't exist:
# Add --create-table flag
./docsaf --dir . --create-tableConnection refused:
# Make sure Antfly is running
go run ./cmd/antfly swarmNo documents found:
- Ensure the directory contains
.md,.mdx, or valid OpenAPI spec files - Check file extensions (case-insensitive)
- OpenAPI files must be valid v3 specifications
OpenAPI parsing errors:
- Verify the file is a valid OpenAPI 3.x specification
- Use a validator like swagger-editor
- Check for YAML/JSON syntax errors
- Note: Invalid OpenAPI files are logged and skipped
Real-World Use Cases
- Documentation Sync: Keep Antfly in sync with documentation directories (Markdown, MDX, OpenAPI)
- API Documentation: Ingest OpenAPI specs for searchable API reference
- Knowledge Base: Import and maintain wiki/docs from external sources
- Multi-Format Search: Search across markdown docs and API specs simultaneously
- Developer Portal: Build searchable developer documentation from multiple sources
Next Steps
- Add embedding generation to enable semantic search across all document types
- Implement Git integration to track document versions and authors
- Add support for more file formats (reStructuredText, AsciiDoc, etc.)
- Create webhooks for automatic re-sync on file changes
- Build a UI for browsing documents by type
API Reference
See the full Linear Merge API documentation:
- OpenAPI spec:
src/metadata/api.yaml(search forLinearMerge) - goldmark documentation
- libopenapi documentation
- OpenAPI Specification
Project Files
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "flag"
7 "fmt"
8 "log"
9 "net/http"
10 "os"
11 "slices"
12 "strings"
13 "time"
14
15 "github.com/antflydb/antfly-go/antfly"
16 "github.com/antflydb/antfly-go/docsaf"
17)
18
19// StringSliceFlag allows repeated flags to build a slice
20type StringSliceFlag []string
21
22func (s *StringSliceFlag) String() string {
23 return strings.Join(*s, ", ")
24}
25
26func (s *StringSliceFlag) Set(value string) error {
27 *s = append(*s, value)
28 return nil
29}
30
31// ANCHOR: prepare_cmd
32func prepareCmd(args []string) error {
33 fs := flag.NewFlagSet("prepare", flag.ExitOnError)
34 dirPath := fs.String("dir", "", "Path to directory containing documentation files (required)")
35 outputFile := fs.String("output", "docs.json", "Output JSON file path")
36 baseURL := fs.String("base-url", "", "Base URL for generating document links (optional)")
37
38 var includePatterns StringSliceFlag
39 var excludePatterns StringSliceFlag
40 fs.Var(&includePatterns, "include", "Include pattern (can be repeated, supports ** wildcards)")
41 fs.Var(&excludePatterns, "exclude", "Exclude pattern (can be repeated, supports ** wildcards)")
42
43 if err := fs.Parse(args); err != nil {
44 return fmt.Errorf("failed to parse flags: %w", err)
45 }
46
47 if *dirPath == "" {
48 return fmt.Errorf("--dir flag is required")
49 }
50
51 // Verify path exists and is a directory
52 fileInfo, err := os.Stat(*dirPath)
53 if err != nil {
54 return fmt.Errorf("failed to access path: %w", err)
55 }
56
57 if !fileInfo.IsDir() {
58 return fmt.Errorf("--dir must be a directory")
59 }
60
61 fmt.Printf("=== docsaf prepare - Process Documentation Files ===\n")
62 fmt.Printf("Directory: %s\n", *dirPath)
63 fmt.Printf("Output: %s\n", *outputFile)
64 if len(includePatterns) > 0 {
65 fmt.Printf("Include patterns: %v\n", includePatterns)
66 }
67 if len(excludePatterns) > 0 {
68 fmt.Printf("Exclude patterns: %v\n", excludePatterns)
69 }
70 fmt.Printf("\n")
71
72 // Create filesystem source and processor using library
73 source := docsaf.NewFilesystemSource(docsaf.FilesystemSourceConfig{
74 BaseDir: *dirPath,
75 BaseURL: *baseURL,
76 IncludePatterns: includePatterns,
77 ExcludePatterns: excludePatterns,
78 })
79 processor := docsaf.NewProcessor(source, docsaf.DefaultRegistry())
80
81 // Process all files in the directory
82 fmt.Printf("Processing documentation files (chunking by markdown headings)...\n")
83 sections, err := processor.Process(context.Background())
84 if err != nil {
85 return fmt.Errorf("failed to process directory: %w", err)
86 }
87
88 fmt.Printf("✓ Found %d documents\n\n", len(sections))
89
90 if len(sections) == 0 {
91 return fmt.Errorf("no supported files found in directory")
92 }
93
94 // Count sections by type
95 typeCounts := make(map[string]int)
96 for _, section := range sections {
97 typeCounts[section.Type]++
98 }
99
100 fmt.Printf("Document types found:\n")
101 for docType, count := range typeCounts {
102 fmt.Printf(" - %s: %d\n", docType, count)
103 }
104 fmt.Printf("\n")
105
106 // Show sample of documents
107 fmt.Printf("Sample documents:\n")
108 for i, section := range sections {
109 if i >= 10 {
110 fmt.Printf(" ... and %d more\n", len(sections)-i)
111 break
112 }
113 fmt.Printf(" [%d] %s (%s) - %s\n",
114 i+1, section.Title, section.Type, section.FilePath)
115 }
116 fmt.Printf("\n")
117
118 // Convert sections to records map (sorted by ID for consistent ordering)
119 records := make(map[string]any)
120 for _, section := range sections {
121 records[section.ID] = section.ToDocument()
122 }
123
124 // Write to JSON file
125 fmt.Printf("Writing %d records to %s...\n", len(records), *outputFile)
126 jsonData, err := json.MarshalIndent(records, "", " ")
127 if err != nil {
128 return fmt.Errorf("failed to marshal JSON: %w", err)
129 }
130
131 err = os.WriteFile(*outputFile, jsonData, 0644)
132 if err != nil {
133 return fmt.Errorf("failed to write file: %w", err)
134 }
135
136 fmt.Printf("✓ Prepared data written to %s\n", *outputFile)
137 return nil
138}
139
140// ANCHOR_END: prepare_cmd
141
142// ANCHOR: load_cmd
143func loadCmd(args []string) error {
144 fs := flag.NewFlagSet("load", flag.ExitOnError)
145 antflyURL := fs.String("url", "http://localhost:8080/api/v1", "Antfly API URL")
146 tableName := fs.String("table", "docs", "Table name to merge into")
147 inputFile := fs.String("input", "docs.json", "Input JSON file path")
148 dryRun := fs.Bool("dry-run", false, "Preview changes without applying them")
149 createTable := fs.Bool("create-table", false, "Create table if it doesn't exist")
150 numShards := fs.Int("num-shards", 1, "Number of shards for new table")
151 batchSize := fs.Int("batch-size", 25, "Linear merge batch size")
152 embeddingModel := fs.String("embedding-model", "embeddinggemma", "Embedding model to use (e.g., embeddinggemma)")
153 chunkerModel := fs.String("chunker-model", "fixed-bert-tokenizer", "Chunker model: fixed-bert-tokenizer, fixed-bpe-tokenizer, or any ONNX model directory name")
154 targetTokens := fs.Int("target-tokens", 512, "Target tokens for chunking")
155 overlapTokens := fs.Int("overlap-tokens", 50, "Overlap tokens for chunking")
156 if err := fs.Parse(args); err != nil {
157 return fmt.Errorf("failed to parse flags: %w", err)
158 }
159
160 ctx := context.Background()
161
162 // Create Antfly client
163 client, err := antfly.NewAntflyClient(*antflyURL, http.DefaultClient)
164 if err != nil {
165 return fmt.Errorf("failed to create Antfly client: %w", err)
166 }
167
168 fmt.Printf("=== docsaf load - Load Data to Antfly ===\n")
169 fmt.Printf("Antfly URL: %s\n", *antflyURL)
170 fmt.Printf("Table: %s\n", *tableName)
171 fmt.Printf("Input: %s\n", *inputFile)
172 fmt.Printf("Dry run: %v\n\n", *dryRun)
173
174 // Create table if requested
175 if *createTable {
176 fmt.Printf("Creating table '%s' with %d shards...\n", *tableName, *numShards)
177
178 // Create embedding index configuration
179 embeddingIndex, err := createEmbeddingIndex(*embeddingModel, *chunkerModel, *targetTokens, *overlapTokens)
180 if err != nil {
181 return fmt.Errorf("failed to create embedding index config: %w", err)
182 }
183
184 err = client.CreateTable(ctx, *tableName, antfly.CreateTableRequest{
185 NumShards: uint(*numShards),
186 Indexes: map[string]antfly.IndexConfig{
187 "embeddings": *embeddingIndex,
188 },
189 })
190 if err != nil {
191 log.Printf("Warning: Failed to create table (may already exist): %v\n", err)
192 } else {
193 fmt.Printf("✓ Table created with BM25 and embedding indexes (%s with %s chunking)\n\n",
194 *embeddingModel, *chunkerModel)
195 }
196
197 // Wait for shards to be ready
198 if err := waitForShardsReady(ctx, client, *tableName, 30*time.Second); err != nil {
199 return fmt.Errorf("error waiting for shards: %w", err)
200 }
201 }
202
203 // Read JSON file
204 fmt.Printf("Reading records from %s...\n", *inputFile)
205 jsonData, err := os.ReadFile(*inputFile)
206 if err != nil {
207 return fmt.Errorf("failed to read file: %w", err)
208 }
209
210 var records map[string]any
211 err = json.Unmarshal(jsonData, &records)
212 if err != nil {
213 return fmt.Errorf("failed to unmarshal JSON: %w", err)
214 }
215
216 fmt.Printf("✓ Loaded %d records\n\n", len(records))
217
218 // Perform batched linear merge
219 finalCursor, err := performBatchedLinearMerge(ctx, client, *tableName, records, *batchSize, *dryRun)
220 if err != nil {
221 return fmt.Errorf("batched linear merge failed: %w", err)
222 }
223
224 // Final cleanup: delete any remaining documents beyond the last cursor
225 if finalCursor != "" && !*dryRun {
226 fmt.Printf("\nPerforming final cleanup to remove orphaned documents...\n")
227 cleanupResult, err := client.LinearMerge(ctx, *tableName, antfly.LinearMergeRequest{
228 Records: map[string]any{}, // Empty records
229 LastMergedId: finalCursor, // Start from last cursor
230 DryRun: false,
231 SyncLevel: antfly.SyncLevelAknn,
232 })
233 if err != nil {
234 return fmt.Errorf("final cleanup failed: %w", err)
235 }
236 fmt.Printf("✓ Final cleanup completed in %s\n", cleanupResult.Took)
237 fmt.Printf(" Deleted: %d orphaned documents\n", cleanupResult.Deleted)
238 }
239
240 fmt.Printf("\n✓ Load completed successfully\n")
241 return nil
242}
243
244// ANCHOR_END: load_cmd
245
246// ANCHOR: sync_cmd
247func syncCmd(args []string) error {
248 fs := flag.NewFlagSet("sync", flag.ExitOnError)
249 antflyURL := fs.String("url", "http://localhost:8080/api/v1", "Antfly API URL")
250 tableName := fs.String("table", "docs", "Table name to merge into")
251 dirPath := fs.String("dir", "", "Path to directory containing documentation files (required)")
252 baseURL := fs.String("base-url", "", "Base URL for generating document links (optional)")
253 dryRun := fs.Bool("dry-run", false, "Preview changes without applying them")
254 createTable := fs.Bool("create-table", false, "Create table if it doesn't exist")
255 numShards := fs.Int("num-shards", 1, "Number of shards for new table")
256 batchSize := fs.Int("batch-size", 25, "Linear merge batch size")
257 embeddingModel := fs.String("embedding-model", "embeddinggemma", "Embedding model to use (e.g., embeddinggemma)")
258 chunkerModel := fs.String("chunker-model", "fixed-bert-tokenizer", "Chunker model: fixed-bert-tokenizer, fixed-bpe-tokenizer, or any ONNX model directory name")
259 targetTokens := fs.Int("target-tokens", 512, "Target tokens for chunking")
260 overlapTokens := fs.Int("overlap-tokens", 50, "Overlap tokens for chunking")
261
262 var includePatterns StringSliceFlag
263 var excludePatterns StringSliceFlag
264 fs.Var(&includePatterns, "include", "Include pattern (can be repeated, supports ** wildcards)")
265 fs.Var(&excludePatterns, "exclude", "Exclude pattern (can be repeated, supports ** wildcards)")
266
267 if err := fs.Parse(args); err != nil {
268 return fmt.Errorf("failed to parse flags: %w", err)
269 }
270
271 if *dirPath == "" {
272 return fmt.Errorf("--dir flag is required")
273 }
274
275 ctx := context.Background()
276
277 // Create Antfly client
278 client, err := antfly.NewAntflyClient(*antflyURL, http.DefaultClient)
279 if err != nil {
280 return fmt.Errorf("failed to create Antfly client: %w", err)
281 }
282
283 fmt.Printf("=== docsaf sync - Full Pipeline ===\n")
284 fmt.Printf("Antfly URL: %s\n", *antflyURL)
285 fmt.Printf("Table: %s\n", *tableName)
286 fmt.Printf("Directory: %s\n", *dirPath)
287 fmt.Printf("Dry run: %v\n", *dryRun)
288 if len(includePatterns) > 0 {
289 fmt.Printf("Include patterns: %v\n", includePatterns)
290 }
291 if len(excludePatterns) > 0 {
292 fmt.Printf("Exclude patterns: %v\n", excludePatterns)
293 }
294 fmt.Printf("\n")
295
296 // Create table if requested
297 if *createTable {
298 fmt.Printf("Creating table '%s' with %d shards...\n", *tableName, *numShards)
299
300 // Create embedding index configuration
301 embeddingIndex, err := createEmbeddingIndex(*embeddingModel, *chunkerModel, *targetTokens, *overlapTokens)
302 if err != nil {
303 return fmt.Errorf("failed to create embedding index config: %w", err)
304 }
305
306 err = client.CreateTable(ctx, *tableName, antfly.CreateTableRequest{
307 NumShards: uint(*numShards),
308 Indexes: map[string]antfly.IndexConfig{
309 "embeddings": *embeddingIndex,
310 },
311 })
312 if err != nil {
313 log.Printf("Warning: Failed to create table (may already exist): %v\n", err)
314 } else {
315 fmt.Printf("✓ Table created with BM25 and embedding indexes (%s with %s chunking)\n\n",
316 *embeddingModel, *chunkerModel)
317 }
318
319 // Wait for shards to be ready
320 if err := waitForShardsReady(ctx, client, *tableName, 30*time.Second); err != nil {
321 return fmt.Errorf("error waiting for shards: %w", err)
322 }
323 }
324
325 // Verify path exists and is a directory
326 fileInfo, err := os.Stat(*dirPath)
327 if err != nil {
328 return fmt.Errorf("failed to access path: %w", err)
329 }
330
331 if !fileInfo.IsDir() {
332 return fmt.Errorf("--dir must be a directory")
333 }
334
335 // Create filesystem source and processor using library
336 source := docsaf.NewFilesystemSource(docsaf.FilesystemSourceConfig{
337 BaseDir: *dirPath,
338 BaseURL: *baseURL,
339 IncludePatterns: includePatterns,
340 ExcludePatterns: excludePatterns,
341 })
342 processor := docsaf.NewProcessor(source, docsaf.DefaultRegistry())
343
344 // Process all files in the directory
345 fmt.Printf("Processing documentation files (chunking by markdown headings)...\n")
346 sections, err := processor.Process(context.Background())
347 if err != nil {
348 return fmt.Errorf("failed to process directory: %w", err)
349 }
350
351 fmt.Printf("✓ Found %d documents\n\n", len(sections))
352
353 if len(sections) == 0 {
354 return fmt.Errorf("no supported files found in directory")
355 }
356
357 // Count sections by type
358 typeCounts := make(map[string]int)
359 for _, section := range sections {
360 typeCounts[section.Type]++
361 }
362
363 fmt.Printf("Document types found:\n")
364 for docType, count := range typeCounts {
365 fmt.Printf(" - %s: %d\n", docType, count)
366 }
367 fmt.Printf("\n")
368
369 // Show sample of documents
370 fmt.Printf("Sample documents:\n")
371 for i, section := range sections {
372 if i >= 10 {
373 fmt.Printf(" ... and %d more\n", len(sections)-i)
374 break
375 }
376 fmt.Printf(" [%d] %s (%s) - %s\n",
377 i+1, section.Title, section.Type, section.FilePath)
378 }
379 fmt.Printf("\n")
380
381 // Convert sections to records map
382 records := make(map[string]any)
383 for _, section := range sections {
384 records[section.ID] = section.ToDocument()
385 }
386
387 // Perform batched linear merge
388 finalCursor, err := performBatchedLinearMerge(ctx, client, *tableName, records, *batchSize, *dryRun)
389 if err != nil {
390 return fmt.Errorf("batched linear merge failed: %w", err)
391 }
392
393 // Final cleanup: delete any remaining documents beyond the last cursor
394 if finalCursor != "" && !*dryRun {
395 fmt.Printf("\nPerforming final cleanup to remove orphaned documents...\n")
396 cleanupResult, err := client.LinearMerge(ctx, *tableName, antfly.LinearMergeRequest{
397 Records: map[string]any{}, // Empty records
398 LastMergedId: finalCursor, // Start from last cursor
399 DryRun: false,
400 SyncLevel: antfly.SyncLevelAknn,
401 })
402 if err != nil {
403 return fmt.Errorf("final cleanup failed: %w", err)
404 }
405 fmt.Printf("✓ Final cleanup completed in %s\n", cleanupResult.Took)
406 fmt.Printf(" Deleted: %d orphaned documents\n", cleanupResult.Deleted)
407 }
408
409 fmt.Printf("\n✓ Sync completed successfully\n")
410 return nil
411}
412
413// ANCHOR_END: sync_cmd
414
415// ANCHOR: create_embedding_index
416// createEmbeddingIndex creates an embedding index configuration with chunking
417func createEmbeddingIndex(embeddingModel, chunkerModel string, targetTokens, overlapTokens int) (*antfly.IndexConfig, error) {
418 embeddingIndexConfig := antfly.IndexConfig{
419 Name: "embeddings",
420 Type: antfly.IndexTypeEmbeddings,
421 }
422
423 // Configure embedder
424 embedder, err := antfly.NewEmbedderConfig(antfly.OllamaEmbedderConfig{
425 Model: embeddingModel,
426 })
427 if err != nil {
428 return nil, fmt.Errorf("failed to configure embedder: %w", err)
429 }
430
431 // Configure chunker via Termite
432 // Model can be "fixed-bert-tokenizer", "fixed-bpe-tokenizer", or any ONNX model directory name
433 chunker := antfly.ChunkerConfig{}
434 err = chunker.FromTermiteChunkerConfig(antfly.TermiteChunkerConfig{
435 Model: chunkerModel,
436 TargetTokens: targetTokens,
437 OverlapTokens: overlapTokens,
438 })
439 if err != nil {
440 return nil, fmt.Errorf("failed to configure chunker: %w", err)
441 }
442
443 // Configure embedding index with chunking
444 // Note: Dimension is calculated automatically based on the embedding model
445 err = embeddingIndexConfig.FromEmbeddingsIndexConfig(antfly.EmbeddingsIndexConfig{
446 Field: "content",
447 Embedder: *embedder,
448 Chunker: chunker,
449 })
450 if err != nil {
451 return nil, fmt.Errorf("failed to configure embedding index: %w", err)
452 }
453
454 return &embeddingIndexConfig, nil
455}
456
457// ANCHOR_END: create_embedding_index
458
459// ANCHOR: batched_linear_merge
460// performBatchedLinearMerge performs LinearMerge in batches with progress logging
461// Returns the final cursor position after processing all batches
462func performBatchedLinearMerge(
463 ctx context.Context,
464 client *antfly.AntflyClient,
465 tableName string,
466 records map[string]any,
467 batchSize int,
468 dryRun bool,
469) (string, error) {
470 // Sort IDs for deterministic pagination (REQUIRED for linear merge!)
471 ids := make([]string, 0, len(records))
472 for id := range records {
473 ids = append(ids, id)
474 }
475 // CRITICAL: Must sort IDs for linear merge cursor logic to work correctly
476 sortedIDs := make([]string, len(ids))
477 copy(sortedIDs, ids)
478 // Use slices.Sort from Go 1.21+
479 // If on older Go, use: sort.Strings(sortedIDs)
480 slices.Sort(sortedIDs)
481
482 totalBatches := (len(sortedIDs) + batchSize - 1) / batchSize
483 fmt.Printf("Loading %d documents in %d batches of %d records\n", len(sortedIDs), totalBatches, batchSize)
484
485 cursor := ""
486 totalUpserted := 0
487 totalSkipped := 0
488 totalDeleted := 0
489
490 for batchNum := range totalBatches {
491 // Calculate batch range
492 start := batchNum * batchSize
493 end := min(start+batchSize, len(sortedIDs))
494
495 // Build batch records map
496 batchIDs := sortedIDs[start:end]
497 batchRecords := make(map[string]any, len(batchIDs))
498 for _, id := range batchIDs {
499 batchRecords[id] = records[id]
500 }
501
502 fmt.Printf("[Batch %d/%d] Merging records %d-%d\n",
503 batchNum+1, totalBatches, start+1, end)
504
505 // Execute LinearMerge with aknn sync level
506 result, err := client.LinearMerge(ctx, tableName, antfly.LinearMergeRequest{
507 Records: batchRecords,
508 LastMergedId: cursor,
509 DryRun: dryRun,
510 SyncLevel: antfly.SyncLevelAknn, // Wait for vector index writes
511 })
512 if err != nil {
513 return "", fmt.Errorf("failed to perform linear merge for batch %d: %w", batchNum+1, err)
514 }
515
516 // Update cursor for next batch
517 if result.NextCursor != "" {
518 cursor = result.NextCursor
519 } else {
520 // Fallback: use last ID in batch
521 cursor = batchIDs[len(batchIDs)-1]
522 }
523
524 // Accumulate totals
525 totalUpserted += result.Upserted
526 totalSkipped += result.Skipped
527 totalDeleted += result.Deleted
528
529 // Log batch progress
530 fmt.Printf(" Completed in %s - Status: %s\n", result.Took, result.Status)
531 fmt.Printf(" Upserted: %d, Skipped: %d, Deleted: %d, Keys scanned: %d\n",
532 result.Upserted, result.Skipped, result.Deleted, result.KeysScanned)
533
534 if len(result.Failed) > 0 {
535 fmt.Printf(" Failed operations: %d\n", len(result.Failed))
536 for i, fail := range result.Failed {
537 if i >= 5 {
538 fmt.Printf(" ... and %d more\n", len(result.Failed)-i)
539 break
540 }
541 fmt.Printf(" [%d] ID=%s, Error=%s\n", i, fail.Id, fail.Error)
542 }
543 }
544 }
545
546 // Log final totals
547 fmt.Printf("\n=== Linear Merge Complete ===\n")
548 fmt.Printf("Total batches: %d\n", totalBatches)
549 fmt.Printf("Total upserted: %d\n", totalUpserted)
550 fmt.Printf("Total skipped: %d\n", totalSkipped)
551 fmt.Printf("Total deleted: %d\n", totalDeleted)
552
553 return cursor, nil
554}
555
556// ANCHOR_END: batched_linear_merge
557
558// waitForShardsReady polls the table status until shards are ready to accept writes
559func waitForShardsReady(ctx context.Context, client *antfly.AntflyClient, tableName string, timeout time.Duration) error {
560 fmt.Printf("Waiting for shards to be ready...\n")
561
562 deadline := time.Now().Add(timeout)
563 ticker := time.NewTicker(500 * time.Millisecond)
564 defer ticker.Stop()
565
566 pollCount := 0
567
568 for {
569 select {
570 case <-ctx.Done():
571 return fmt.Errorf("context cancelled while waiting for shards")
572 case <-ticker.C:
573 pollCount++
574
575 if time.Now().After(deadline) {
576 return fmt.Errorf("timeout waiting for shards after %d polls", pollCount)
577 }
578
579 // Get table status
580 status, err := client.GetTable(ctx, tableName)
581 if err != nil {
582 fmt.Printf(" [Poll %d] Error getting table status: %v\n", pollCount, err)
583 continue
584 }
585
586 // Check if we have at least one shard
587 if len(status.Shards) > 0 {
588 // Wait longer to ensure leader election completes and propagates
589 if pollCount >= 6 {
590 fmt.Printf("✓ Shards ready after %d polls (~%dms)\n\n", pollCount, pollCount*500)
591 return nil
592 }
593 fmt.Printf(" [Poll %d] Found %d shard(s), waiting for leader status to propagate\n", pollCount, len(status.Shards))
594 } else {
595 fmt.Printf(" [Poll %d] No shards found yet\n", pollCount)
596 }
597 }
598 }
599}
600
601func main() {
602 if len(os.Args) < 2 {
603 fmt.Fprintf(os.Stderr, "docsaf - Documentation Sync to Antfly\n\n")
604 fmt.Fprintf(os.Stderr, "Usage:\n")
605 fmt.Fprintf(os.Stderr, " docsaf prepare [flags] - Process files and create sorted JSON data\n")
606 fmt.Fprintf(os.Stderr, " docsaf load [flags] - Load JSON data into Antfly\n")
607 fmt.Fprintf(os.Stderr, " docsaf sync [flags] - Full pipeline (prepare + load)\n")
608 fmt.Fprintf(os.Stderr, "\nCommands:\n")
609 fmt.Fprintf(os.Stderr, " prepare Process documentation files and save to JSON\n")
610 fmt.Fprintf(os.Stderr, " load Load prepared JSON data into Antfly table\n")
611 fmt.Fprintf(os.Stderr, " sync Process files and load directly (original behavior)\n")
612 fmt.Fprintf(os.Stderr, "\nExamples:\n")
613 fmt.Fprintf(os.Stderr, " # Prepare data\n")
614 fmt.Fprintf(os.Stderr, " docsaf prepare --dir /path/to/docs --output docs.json\n\n")
615 fmt.Fprintf(os.Stderr, " # Load prepared data\n")
616 fmt.Fprintf(os.Stderr, " docsaf load --input docs.json --table docs --create-table\n\n")
617 fmt.Fprintf(os.Stderr, " # Full pipeline\n")
618 fmt.Fprintf(os.Stderr, " docsaf sync --dir /path/to/docs --table docs --create-table\n\n")
619 os.Exit(1)
620 }
621
622 var err error
623 switch os.Args[1] {
624 case "prepare":
625 err = prepareCmd(os.Args[2:])
626 case "load":
627 err = loadCmd(os.Args[2:])
628 case "sync":
629 err = syncCmd(os.Args[2:])
630 default:
631 fmt.Fprintf(os.Stderr, "Unknown command: %s\n", os.Args[1])
632 fmt.Fprintf(os.Stderr, "Valid commands: prepare, load, sync\n")
633 os.Exit(1)
634 }
635
636 if err != nil {
637 log.Fatalf("Error: %v", err)
638 }
639}
640