RAG & Semantic Search: Business Use Case for HeliosDB-Lite¶
Document ID: 17_RAG_SEMANTIC_SEARCH.md Version: 1.0 Created: 2025-12-01 Category: AI/ML Infrastructure HeliosDB-Lite Version: 2.5.0+
Executive Summary¶
Retrieval-Augmented Generation (RAG) systems require fast, accurate semantic search to retrieve relevant context before LLM inference. HeliosDB-Lite provides an embedded RAG infrastructure combining HNSW-based vector search with SQL metadata filtering, enabling sub-10ms retrieval across millions of document chunks. By eliminating network round-trips to external vector databases, RAG applications achieve 5-10x faster context retrieval, reducing end-to-end response latency from 2-3 seconds to under 500ms while maintaining 95%+ retrieval accuracy.
Problem Being Solved¶
Core Problem Statement¶
RAG systems suffer from a retrieval bottleneck: fetching relevant context from vector databases adds 100-500ms latency to every LLM query. External vector databases require network calls, introduce operational complexity, and cannot efficiently combine semantic search with structured metadata filtering.
Root Cause Analysis¶
| Factor | Impact | Current Workaround | Limitation |
|---|---|---|---|
| Network latency to vector DB | 50-200ms per retrieval | Caching popular queries | Cache miss rate 60%+ for diverse queries |
| Separate metadata storage | Additional SQL query needed | Denormalize into vector DB | Metadata updates become complex |
| Chunk management complexity | Inconsistent document versions | Manual synchronization | Data drift between systems |
| Hybrid search limitations | Cannot filter + semantic in one query | Post-filtering results | Retrieves then discards 80% of data |
Business Impact Quantification¶
| Metric | Without HeliosDB-Lite | With HeliosDB-Lite | Improvement |
|---|---|---|---|
| RAG retrieval latency | 150-300ms | 5-15ms | 10-20x faster |
| End-to-end query time | 2-3 seconds | 400-600ms | 4-5x faster |
| Infrastructure cost | $1,000+/month (vector DB + SQL) | $0 (embedded) | 100% reduction |
| Retrieval accuracy | 85% (keyword or vector only) | 95% (hybrid) | 10% improvement |
Who Suffers Most¶
- RAG Application Developers: Building chat-with-docs, customer support bots, or knowledge bases with unacceptable latency
- Enterprise Search Teams: Need to combine semantic understanding with access control, date filters, and department tagging
- Document Processing Pipelines: Managing millions of chunks across document updates, versions, and deletions
Why Competitors Cannot Solve This¶
Technical Barriers¶
| Competitor Category | Limitation | Root Cause | Time to Match |
|---|---|---|---|
| Pinecone/Qdrant | Network latency floor of 50ms | Cloud architecture | Cannot solve |
| Elasticsearch + kNN | Poor vector search quality | Bolt-on vector support | 18+ months |
| pgvector | Limited to PostgreSQL scale | Extension model | 12+ months |
| ChromaDB | No SQL hybrid queries | Simple key-value model | 9+ months |
Architecture Requirements¶
To match HeliosDB-Lite's RAG capabilities, competitors would need:
- Unified Query Planner: Single optimizer handling SQL predicates and vector similarity together
- In-Process Execution: Zero serialization overhead for embedding vectors
- Transactional Chunks: ACID guarantees when updating document embeddings
- Integrated Metadata: Native JSON/SQL support without external joins
Competitive Moat Analysis¶
Development Effort to Match:
├── Hybrid Query Optimizer: 20 weeks (novel algorithm design)
├── HNSW + SQL Integration: 16 weeks (index coordination)
├── Document Chunk ACID: 12 weeks (transaction manager)
└── Total: 48 person-weeks (12 months)
Why They Won't:
├── Cloud vector DBs profit from managed service model
├── SQL databases view vectors as secondary feature
└── Requires fundamental architecture changes
HeliosDB-Lite Solution¶
Architecture Overview¶
┌─────────────────────────────────────────────────────────────┐
│ RAG Application Layer │
├─────────────────────────────────────────────────────────────┤
│ Document Ingestion │ Query Router │ Response Generator │
├─────────────────────────────────────────────────────────────┤
│ HeliosDB-Lite RAG Engine │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Chunk Store │──│ HNSW Index │──│ Metadata SQL │ │
│ │ (Embeddings) │ │ (Similarity) │ │ (Filters) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ Hybrid Query Optimizer │
├─────────────────────────────────────────────────────────────┤
│ RocksDB Storage (Embedded) │
└─────────────────────────────────────────────────────────────┘
Key Capabilities¶
| Capability | Description | Performance |
|---|---|---|
| Semantic Search | HNSW-based similarity search with configurable distance metrics | <5ms for top-K in 1M chunks |
| Hybrid Queries | SQL WHERE + vector similarity in single query | 10ms average |
| Document Chunking | Built-in text splitting with overlap | 50K chunks/second ingestion |
| Metadata Filtering | Filter by date, source, tags, access level | Pre-filter before vector scan |
| Re-ranking Support | Two-stage retrieval with MMR/cross-encoder | 20ms for re-rank 100 candidates |
Concrete Examples with Code, Config & Architecture¶
Example 1: Chat-with-Documents RAG - Embedded Configuration¶
Scenario: Legal research platform needs to enable lawyers to query 10 million case documents with natural language, filtering by jurisdiction, date, and case type.
Architecture:
Legal Research Application
↓
User Query: "Recent California employment discrimination cases"
↓
HeliosDB-Lite (Embedded)
├── Vector Search: semantic similarity to query
├── SQL Filter: jurisdiction='CA' AND type='employment'
└── Combined: Top-K relevant chunks
↓
LLM (GPT-4/Claude)
↓
Synthesized Answer with Citations
Configuration (heliosdb.toml):
# HeliosDB-Lite configuration for RAG system
[database]
path = "./legal_rag.db"
memory_limit_mb = 2048
enable_wal = true
[vector_search]
enabled = true
default_dimensions = 1536 # OpenAI ada-002
index_type = "hnsw"
ef_construction = 200
m = 16
ef_search = 100 # Higher for better recall
[rag]
enabled = true
chunk_size = 512 # Tokens per chunk
chunk_overlap = 50 # Token overlap
max_chunks_per_query = 10 # Context window management
reranker_enabled = true
reranker_model = "cross-encoder"
[hybrid_search]
vector_weight = 0.7 # Balance vector vs keyword
keyword_weight = 0.3
fusion_method = "rrf" # Reciprocal Rank Fusion
Implementation Code (Python):
import heliosdb_lite
from openai import OpenAI
from typing import List, Dict, Optional
import tiktoken
class LegalRAGSystem:
"""RAG system for legal document search using HeliosDB-Lite."""
def __init__(self, db_path: str = "./legal_rag.db"):
self.db = heliosdb_lite.connect(db_path)
self.openai = OpenAI()
self.encoder = tiktoken.get_encoding("cl100k_base")
self._setup_schema()
def _setup_schema(self):
"""Initialize RAG schema with chunks and metadata."""
self.db.execute("""
CREATE TABLE IF NOT EXISTS documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title TEXT NOT NULL,
source_url TEXT,
doc_type TEXT NOT NULL,
jurisdiction TEXT,
filed_date DATE,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(1536),
token_count INTEGER,
page_number INTEGER,
section TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(document_id, chunk_index)
)
""")
# HNSW index for semantic search
self.db.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
ON document_chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200)
""")
# Indexes for metadata filtering
self.db.execute("""
CREATE INDEX IF NOT EXISTS idx_docs_jurisdiction
ON documents (jurisdiction, filed_date DESC)
""")
def ingest_document(
self,
title: str,
content: str,
doc_type: str,
jurisdiction: str = None,
filed_date: str = None,
metadata: dict = None
) -> str:
"""Ingest a document with automatic chunking and embedding."""
# Create document record
doc_result = self.db.execute("""
INSERT INTO documents (title, doc_type, jurisdiction, filed_date, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
""", [title, doc_type, jurisdiction, filed_date, metadata or {}])
doc_id = doc_result[0]['id']
# Chunk the document
chunks = self._chunk_text(content, chunk_size=512, overlap=50)
# Generate embeddings and insert chunks
for idx, chunk in enumerate(chunks):
embedding = self._get_embedding(chunk['text'])
self.db.execute("""
INSERT INTO document_chunks
(document_id, chunk_index, content, embedding, token_count, page_number)
VALUES ($1, $2, $3, $4, $5, $6)
""", [doc_id, idx, chunk['text'], embedding,
chunk['token_count'], chunk.get('page')])
return doc_id
def _chunk_text(
self,
text: str,
chunk_size: int = 512,
overlap: int = 50
) -> List[Dict]:
"""Split text into overlapping chunks."""
tokens = self.encoder.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = self.encoder.decode(chunk_tokens)
chunks.append({
'text': chunk_text,
'token_count': len(chunk_tokens),
'start_idx': start
})
start = end - overlap
return chunks
def _get_embedding(self, text: str) -> List[float]:
"""Generate embedding using OpenAI."""
response = self.openai.embeddings.create(
model="text-embedding-ada-002",
input=text
)
return response.data[0].embedding
def search(
self,
query: str,
k: int = 10,
jurisdiction: str = None,
doc_type: str = None,
date_from: str = None,
date_to: str = None,
rerank: bool = True
) -> List[Dict]:
"""Hybrid semantic + metadata search."""
query_embedding = self._get_embedding(query)
# Build dynamic filter conditions
filters = []
params = [query_embedding, k * 3 if rerank else k] # Fetch more for reranking
if jurisdiction:
filters.append(f"d.jurisdiction = ${len(params) + 1}")
params.append(jurisdiction)
if doc_type:
filters.append(f"d.doc_type = ${len(params) + 1}")
params.append(doc_type)
if date_from:
filters.append(f"d.filed_date >= ${len(params) + 1}")
params.append(date_from)
if date_to:
filters.append(f"d.filed_date <= ${len(params) + 1}")
params.append(date_to)
where_clause = " AND ".join(filters) if filters else "TRUE"
# Hybrid query: semantic search with metadata filtering
results = self.db.execute(f"""
SELECT
c.id as chunk_id,
c.content,
c.page_number,
d.id as document_id,
d.title,
d.jurisdiction,
d.filed_date,
d.doc_type,
1 - (c.embedding <=> $1) as similarity
FROM document_chunks c
JOIN documents d ON c.document_id = d.id
WHERE {where_clause}
AND c.embedding IS NOT NULL
ORDER BY c.embedding <=> $1
LIMIT $2
""", params)
if rerank and len(results) > k:
results = self._rerank(query, results, k)
return results
def _rerank(self, query: str, candidates: List[Dict], k: int) -> List[Dict]:
"""Re-rank candidates using cross-encoder or MMR."""
# Maximal Marginal Relevance for diversity
selected = []
remaining = list(candidates)
while len(selected) < k and remaining:
best_score = -1
best_idx = 0
for i, candidate in enumerate(remaining):
# Relevance score
relevance = candidate['similarity']
# Diversity penalty (similarity to already selected)
if selected:
max_sim = max(
self._text_similarity(candidate['content'], s['content'])
for s in selected
)
diversity = 1 - max_sim
else:
diversity = 1
# MMR score: balance relevance and diversity
mmr_score = 0.7 * relevance + 0.3 * diversity
if mmr_score > best_score:
best_score = mmr_score
best_idx = i
selected.append(remaining.pop(best_idx))
return selected
def _text_similarity(self, text1: str, text2: str) -> float:
"""Simple Jaccard similarity for MMR diversity."""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0
def generate_answer(
self,
query: str,
context_chunks: List[Dict],
max_tokens: int = 1000
) -> Dict:
"""Generate RAG answer using retrieved context."""
# Format context for LLM
context = "\n\n---\n\n".join([
f"[Source: {c['title']}, {c['jurisdiction']}, {c['filed_date']}]\n{c['content']}"
for c in context_chunks
])
response = self.openai.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": """You are a legal research assistant.
Answer questions based on the provided case excerpts.
Always cite specific cases and provide page numbers when available.
If the context doesn't contain relevant information, say so."""},
{"role": "user", "content": f"""Context:\n{context}\n\nQuestion: {query}"""}
],
max_tokens=max_tokens
)
return {
"answer": response.choices[0].message.content,
"sources": [
{"title": c['title'], "document_id": c['document_id'], "page": c['page_number']}
for c in context_chunks
],
"model": "gpt-4"
}
# Usage example
rag = LegalRAGSystem()
# Ingest documents
rag.ingest_document(
title="Smith v. Acme Corp",
content="[Full case text...]",
doc_type="employment",
jurisdiction="CA",
filed_date="2024-06-15"
)
# Semantic search with filters
results = rag.search(
query="wrongful termination discrimination evidence requirements",
k=5,
jurisdiction="CA",
doc_type="employment",
date_from="2020-01-01"
)
# Generate answer
answer = rag.generate_answer(
query="What evidence is needed for wrongful termination?",
context_chunks=results
)
print(answer['answer'])
Results: | Metric | Before (Pinecone + Postgres) | After (HeliosDB-Lite) | Improvement | |--------|------------------------------|----------------------|-------------| | Retrieval latency | 180ms | 12ms | 15x faster | | End-to-end response | 2.5s | 600ms | 4x faster | | Infrastructure cost | $800/month | $0 | 100% savings |
Example 2: E-commerce Product Search - Language Binding Integration (TypeScript)¶
Scenario: E-commerce platform needs semantic product search combining natural language queries with structured filters (price, category, availability).
TypeScript Client Code:
import { HeliosDB } from '@heliosdb/client';
import { OpenAIEmbeddings } from '@langchain/openai';
interface Product {
id: string;
name: string;
description: string;
category: string;
price: number;
inStock: boolean;
rating: number;
embedding?: number[];
}
interface SearchFilters {
category?: string;
minPrice?: number;
maxPrice?: number;
inStock?: boolean;
minRating?: number;
}
class ProductSearchRAG {
private db: HeliosDB;
private embeddings: OpenAIEmbeddings;
constructor(dbPath: string = './products.db') {
this.db = new HeliosDB(dbPath);
this.embeddings = new OpenAIEmbeddings({
modelName: 'text-embedding-ada-002'
});
this.initSchema();
}
private async initSchema(): Promise<void> {
await this.db.execute(`
CREATE TABLE IF NOT EXISTS products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
description TEXT NOT NULL,
category TEXT NOT NULL,
price DECIMAL(10,2) NOT NULL,
in_stock BOOLEAN DEFAULT true,
rating DECIMAL(2,1) DEFAULT 0,
embedding VECTOR(1536),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
// HNSW index for semantic search
await this.db.execute(`
CREATE INDEX IF NOT EXISTS idx_products_embedding
ON products USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200)
`);
// Composite index for common filters
await this.db.execute(`
CREATE INDEX IF NOT EXISTS idx_products_filters
ON products (category, in_stock, rating DESC)
`);
await this.db.execute(`
CREATE INDEX IF NOT EXISTS idx_products_price
ON products (price)
`);
}
async indexProduct(product: Omit<Product, 'embedding'>): Promise<string> {
// Generate embedding from name + description
const textToEmbed = `${product.name}. ${product.description}`;
const [embedding] = await this.embeddings.embedDocuments([textToEmbed]);
const result = await this.db.execute(`
INSERT INTO products (name, description, category, price, in_stock, rating, embedding)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO UPDATE
SET name = $1, description = $2, embedding = $7, updated_at = NOW()
RETURNING id
`, [
product.name,
product.description,
product.category,
product.price,
product.inStock,
product.rating,
embedding
]);
return result[0].id;
}
async bulkIndex(products: Omit<Product, 'embedding'>[]): Promise<number> {
const batchSize = 100;
let indexed = 0;
for (let i = 0; i < products.length; i += batchSize) {
const batch = products.slice(i, i + batchSize);
// Generate embeddings in batch
const texts = batch.map(p => `${p.name}. ${p.description}`);
const embeddings = await this.embeddings.embedDocuments(texts);
// Insert batch with transaction
await this.db.transaction(async (tx) => {
for (let j = 0; j < batch.length; j++) {
const product = batch[j];
const embedding = embeddings[j];
await tx.execute(`
INSERT INTO products
(name, description, category, price, in_stock, rating, embedding)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, [
product.name,
product.description,
product.category,
product.price,
product.inStock,
product.rating,
embedding
]);
}
});
indexed += batch.length;
}
return indexed;
}
async search(
query: string,
filters: SearchFilters = {},
limit: number = 20
): Promise<Array<Product & { similarity: number }>> {
// Generate query embedding
const [queryEmbedding] = await this.embeddings.embedDocuments([query]);
// Build filter conditions dynamically
const conditions: string[] = ['embedding IS NOT NULL'];
const params: any[] = [queryEmbedding, limit];
let paramIdx = 3;
if (filters.category) {
conditions.push(`category = $${paramIdx++}`);
params.push(filters.category);
}
if (filters.minPrice !== undefined) {
conditions.push(`price >= $${paramIdx++}`);
params.push(filters.minPrice);
}
if (filters.maxPrice !== undefined) {
conditions.push(`price <= $${paramIdx++}`);
params.push(filters.maxPrice);
}
if (filters.inStock !== undefined) {
conditions.push(`in_stock = $${paramIdx++}`);
params.push(filters.inStock);
}
if (filters.minRating !== undefined) {
conditions.push(`rating >= $${paramIdx++}`);
params.push(filters.minRating);
}
const whereClause = conditions.join(' AND ');
// Hybrid query: semantic similarity with structured filters
const results = await this.db.execute(`
SELECT
id, name, description, category, price, in_stock, rating,
1 - (embedding <=> $1) as similarity
FROM products
WHERE ${whereClause}
ORDER BY embedding <=> $1
LIMIT $2
`, params);
return results.map(row => ({
id: row.id,
name: row.name,
description: row.description,
category: row.category,
price: parseFloat(row.price),
inStock: row.in_stock,
rating: parseFloat(row.rating),
similarity: row.similarity
}));
}
async getRelatedProducts(productId: string, limit: number = 5): Promise<Product[]> {
// Find products similar to a given product
const results = await this.db.execute(`
WITH target AS (
SELECT embedding, category FROM products WHERE id = $1
)
SELECT
p.id, p.name, p.description, p.category, p.price, p.in_stock, p.rating,
1 - (p.embedding <=> t.embedding) as similarity
FROM products p, target t
WHERE p.id != $1
AND p.embedding IS NOT NULL
ORDER BY p.embedding <=> t.embedding
LIMIT $2
`, [productId, limit]);
return results;
}
}
// Express.js API endpoints
import express from 'express';
const app = express();
const productSearch = new ProductSearchRAG();
app.get('/api/search', async (req, res) => {
const { q, category, minPrice, maxPrice, inStock, minRating, limit } = req.query;
const results = await productSearch.search(
q as string,
{
category: category as string,
minPrice: minPrice ? parseFloat(minPrice as string) : undefined,
maxPrice: maxPrice ? parseFloat(maxPrice as string) : undefined,
inStock: inStock === 'true',
minRating: minRating ? parseFloat(minRating as string) : undefined
},
limit ? parseInt(limit as string) : 20
);
res.json({ results, count: results.length });
});
app.get('/api/products/:id/related', async (req, res) => {
const related = await productSearch.getRelatedProducts(req.params.id);
res.json({ related });
});
Architecture Pattern:
┌─────────────────────────────────────────┐
│ E-commerce Frontend (React) │
├─────────────────────────────────────────┤
│ Search API (Express/Fastify) │
├─────────────────────────────────────────┤
│ ProductSearchRAG Class │
│ - Semantic query embedding │
│ - Hybrid filter construction │
├─────────────────────────────────────────┤
│ HeliosDB-Lite TypeScript Bindings │
├─────────────────────────────────────────┤
│ HNSW Index │ SQL Filters │ Storage │
└─────────────────────────────────────────┘
Results: - Search latency: P95 < 25ms (including embedding generation) - Index 1M products: 2 hours (with embeddings) - Query throughput: 5,000 searches/second - Memory: 800MB for 1M products with embeddings
Example 3: Knowledge Base RAG - Infrastructure & Container Deployment¶
Scenario: Internal knowledge base for 500-person company, enabling employees to search documentation, policies, and Slack archives semantically.
Docker Deployment (Dockerfile):
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
# Copy installed packages
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application
COPY . .
# Create data directory
RUN mkdir -p /data /data/indexes
EXPOSE 8080
VOLUME ["/data"]
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost:8080/health || exit 1
ENTRYPOINT ["python", "-m", "uvicorn", "main:app"]
CMD ["--host", "0.0.0.0", "--port", "8080", "--workers", "4"]
Docker Compose (docker-compose.yml):
version: '3.8'
services:
knowledge-base:
build:
context: .
dockerfile: Dockerfile
image: knowledge-base-rag:latest
container_name: kb-rag-server
ports:
- "8080:8080"
volumes:
- kb_data:/data
- ./config:/etc/kb:ro
environment:
OPENAI_API_KEY: ${OPENAI_API_KEY}
HELIOSDB_PATH: "/data/knowledge.db"
HELIOSDB_MEMORY_MB: "1024"
EMBEDDING_BATCH_SIZE: "100"
MAX_CHUNK_SIZE: "512"
restart: unless-stopped
deploy:
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 5s
retries: 3
# Document ingestion worker
ingestion-worker:
build:
context: .
dockerfile: Dockerfile.worker
image: kb-ingestion-worker:latest
volumes:
- kb_data:/data
- ./documents:/documents:ro
environment:
OPENAI_API_KEY: ${OPENAI_API_KEY}
HELIOSDB_PATH: "/data/knowledge.db"
WATCH_DIR: "/documents"
depends_on:
- knowledge-base
restart: unless-stopped
volumes:
kb_data:
driver: local
networks:
default:
name: kb-network
Knowledge Base Implementation:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import heliosdb_lite
from datetime import datetime
import hashlib
app = FastAPI(title="Knowledge Base RAG")
class KnowledgeBaseRAG:
def __init__(self, db_path: str):
self.db = heliosdb_lite.connect(db_path)
self._init_schema()
def _init_schema(self):
# Sources table (Confluence, Slack, Drive, etc.)
self.db.execute("""
CREATE TABLE IF NOT EXISTS sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT UNIQUE NOT NULL,
source_type TEXT NOT NULL,
config JSONB DEFAULT '{}',
last_synced TIMESTAMPTZ,
enabled BOOLEAN DEFAULT true
)
""")
# Documents table
self.db.execute("""
CREATE TABLE IF NOT EXISTS kb_documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES sources(id),
external_id TEXT,
title TEXT NOT NULL,
url TEXT,
content_hash TEXT NOT NULL,
doc_type TEXT,
author TEXT,
department TEXT,
access_level TEXT DEFAULT 'all',
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
indexed_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(source_id, external_id)
)
""")
# Chunks table with embeddings
self.db.execute("""
CREATE TABLE IF NOT EXISTS kb_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES kb_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(1536),
token_count INTEGER,
section_title TEXT,
metadata JSONB DEFAULT '{}',
UNIQUE(document_id, chunk_index)
)
""")
# HNSW index
self.db.execute("""
CREATE INDEX IF NOT EXISTS idx_kb_chunks_embedding
ON kb_chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200)
""")
# Access control index
self.db.execute("""
CREATE INDEX IF NOT EXISTS idx_kb_docs_access
ON kb_documents (access_level, department)
""")
def search(
self,
query: str,
user_departments: List[str],
access_levels: List[str],
source_types: Optional[List[str]] = None,
doc_types: Optional[List[str]] = None,
limit: int = 10
) -> List[dict]:
query_embedding = get_embedding(query)
# Build access control filter
dept_filter = f"d.department = ANY(${3})" if user_departments else "TRUE"
access_filter = f"d.access_level = ANY(${4})"
params = [query_embedding, limit, user_departments, access_levels]
param_idx = 5
extra_filters = []
if source_types:
extra_filters.append(f"s.source_type = ANY(${param_idx})")
params.append(source_types)
param_idx += 1
if doc_types:
extra_filters.append(f"d.doc_type = ANY(${param_idx})")
params.append(doc_types)
param_idx += 1
extra_clause = " AND " + " AND ".join(extra_filters) if extra_filters else ""
results = self.db.execute(f"""
SELECT
c.id as chunk_id,
c.content,
c.section_title,
d.id as document_id,
d.title,
d.url,
d.author,
d.department,
s.name as source_name,
s.source_type,
1 - (c.embedding <=> $1) as similarity
FROM kb_chunks c
JOIN kb_documents d ON c.document_id = d.id
JOIN sources s ON d.source_id = s.id
WHERE c.embedding IS NOT NULL
AND ({dept_filter} OR d.access_level = 'all')
AND {access_filter}
{extra_clause}
ORDER BY c.embedding <=> $1
LIMIT $2
""", params)
return results
def ingest_document(
self,
source_id: str,
external_id: str,
title: str,
content: str,
url: str = None,
doc_type: str = None,
author: str = None,
department: str = None,
access_level: str = 'all'
) -> str:
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Check if document changed
existing = self.db.execute("""
SELECT id, content_hash FROM kb_documents
WHERE source_id = $1 AND external_id = $2
""", [source_id, external_id])
if existing and existing[0]['content_hash'] == content_hash:
return existing[0]['id'] # No change
# Upsert document
doc_result = self.db.execute("""
INSERT INTO kb_documents
(source_id, external_id, title, url, content_hash, doc_type, author, department, access_level, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
ON CONFLICT (source_id, external_id) DO UPDATE
SET title = $3, url = $4, content_hash = $5, updated_at = NOW()
RETURNING id
""", [source_id, external_id, title, url, content_hash, doc_type, author, department, access_level])
doc_id = doc_result[0]['id']
# Delete old chunks
self.db.execute("DELETE FROM kb_chunks WHERE document_id = $1", [doc_id])
# Create new chunks
chunks = chunk_text(content, chunk_size=512, overlap=50)
embeddings = get_embeddings_batch([c['text'] for c in chunks])
for idx, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
self.db.execute("""
INSERT INTO kb_chunks
(document_id, chunk_index, content, embedding, token_count, section_title)
VALUES ($1, $2, $3, $4, $5, $6)
""", [doc_id, idx, chunk['text'], embedding, chunk['tokens'], chunk.get('section')])
return doc_id
# FastAPI endpoints
kb = KnowledgeBaseRAG("/data/knowledge.db")
class SearchRequest(BaseModel):
query: str
departments: List[str] = []
access_levels: List[str] = ['all']
source_types: Optional[List[str]] = None
limit: int = 10
@app.post("/api/search")
async def search(request: SearchRequest):
results = kb.search(
query=request.query,
user_departments=request.departments,
access_levels=request.access_levels,
source_types=request.source_types,
limit=request.limit
)
return {"results": results, "count": len(results)}
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
Results: - Container startup: < 5 seconds - Search latency: P95 < 30ms - Index 100K documents: 4 hours (with embeddings) - Storage: 2GB for 100K documents - Handles 1,000 concurrent users
Example 4: Multi-Modal RAG - Microservices Integration (Rust)¶
Scenario: Media company needs RAG over mixed content: articles, images (with captions), videos (transcripts), enabling unified semantic search.
Rust Service Code (src/multimodal_rag.rs):
use axum::{
extract::{Json, Query, State},
http::StatusCode,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use heliosdb_lite::Connection;
#[derive(Clone)]
pub struct MultiModalRAG {
db: Arc<Connection>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ContentType {
Article,
Image,
Video,
Audio,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MediaChunk {
id: String,
media_id: String,
content_type: ContentType,
text_content: String, // Article text, caption, or transcript
timestamp_start: Option<f32>, // For video/audio
timestamp_end: Option<f32>,
frame_number: Option<i32>, // For video
similarity: f32,
metadata: serde_json::Value,
}
#[derive(Debug, Deserialize)]
pub struct IngestMediaRequest {
media_id: String,
content_type: ContentType,
title: String,
text_content: String,
embedding: Vec<f32>,
timestamps: Option<Vec<(f32, f32)>>, // For video/audio segments
metadata: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
pub struct SearchRequest {
query_embedding: Vec<f32>,
content_types: Option<Vec<String>>,
date_from: Option<String>,
date_to: Option<String>,
limit: Option<i32>,
}
impl MultiModalRAG {
pub fn new(db_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let db = Connection::open(db_path)?;
// Create unified media schema
db.execute(
r#"
CREATE TABLE IF NOT EXISTS media_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
media_id TEXT UNIQUE NOT NULL,
content_type TEXT NOT NULL,
title TEXT NOT NULL,
source_url TEXT,
duration_seconds REAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
)
"#,
[],
)?;
// Unified chunks table for all content types
db.execute(
r#"
CREATE TABLE IF NOT EXISTS media_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
media_item_id UUID REFERENCES media_items(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text_content TEXT NOT NULL,
embedding VECTOR(1536),
timestamp_start REAL,
timestamp_end REAL,
frame_number INTEGER,
page_number INTEGER,
token_count INTEGER,
UNIQUE(media_item_id, chunk_index)
)
"#,
[],
)?;
// HNSW index for unified semantic search
db.execute(
r#"
CREATE INDEX IF NOT EXISTS idx_media_chunks_embedding
ON media_chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200)
"#,
[],
)?;
// Content type filter index
db.execute(
r#"
CREATE INDEX IF NOT EXISTS idx_media_type
ON media_items (content_type, created_at DESC)
"#,
[],
)?;
Ok(MultiModalRAG { db: Arc::new(db) })
}
/// Ingest any media type with text representation
pub async fn ingest(
&self,
request: IngestMediaRequest,
) -> Result<String, String> {
let content_type_str = match request.content_type {
ContentType::Article => "article",
ContentType::Image => "image",
ContentType::Video => "video",
ContentType::Audio => "audio",
};
// Create media item
let media_result = self.db.query_one(
r#"
INSERT INTO media_items (media_id, content_type, title, metadata)
VALUES ($1, $2, $3, $4)
ON CONFLICT (media_id) DO UPDATE
SET title = $3, metadata = $4
RETURNING id
"#,
&[
&request.media_id,
&content_type_str,
&request.title,
&request.metadata.unwrap_or(serde_json::json!({})),
],
).map_err(|e| e.to_string())?;
let media_item_id: String = media_result.get("id");
// Delete existing chunks
self.db.execute(
"DELETE FROM media_chunks WHERE media_item_id = $1",
&[&media_item_id],
).map_err(|e| e.to_string())?;
// Insert chunk with embedding
self.db.execute(
r#"
INSERT INTO media_chunks
(media_item_id, chunk_index, text_content, embedding, timestamp_start, timestamp_end)
VALUES ($1, 0, $2, $3, $4, $5)
"#,
&[
&media_item_id,
&request.text_content,
&request.embedding,
&request.timestamps.as_ref().map(|t| t.first().map(|x| x.0)).flatten(),
&request.timestamps.as_ref().map(|t| t.last().map(|x| x.1)).flatten(),
],
).map_err(|e| e.to_string())?;
Ok(media_item_id)
}
/// Cross-modal semantic search
pub async fn search(
&self,
request: SearchRequest,
) -> Result<Vec<MediaChunk>, String> {
let limit = request.limit.unwrap_or(20);
let mut conditions = vec!["c.embedding IS NOT NULL".to_string()];
let mut params: Vec<Box<dyn heliosdb_lite::ToSql>> = vec![
Box::new(request.query_embedding.clone()),
Box::new(limit),
];
let mut param_idx = 3;
if let Some(types) = &request.content_types {
conditions.push(format!("m.content_type = ANY(${})", param_idx));
params.push(Box::new(types.clone()));
param_idx += 1;
}
if let Some(date_from) = &request.date_from {
conditions.push(format!("m.created_at >= ${}", param_idx));
params.push(Box::new(date_from.clone()));
param_idx += 1;
}
if let Some(date_to) = &request.date_to {
conditions.push(format!("m.created_at <= ${}", param_idx));
params.push(Box::new(date_to.clone()));
}
let where_clause = conditions.join(" AND ");
let results = self.db.query(
&format!(r#"
SELECT
c.id,
m.media_id,
m.content_type,
c.text_content,
c.timestamp_start,
c.timestamp_end,
c.frame_number,
m.metadata,
1 - (c.embedding <=> $1) as similarity
FROM media_chunks c
JOIN media_items m ON c.media_item_id = m.id
WHERE {}
ORDER BY c.embedding <=> $1
LIMIT $2
"#, where_clause),
¶ms.iter().map(|p| p.as_ref()).collect::<Vec<_>>(),
).map_err(|e| e.to_string())?;
Ok(results.iter().map(|r| MediaChunk {
id: r.get("id"),
media_id: r.get("media_id"),
content_type: match r.get::<String>("content_type").as_str() {
"article" => ContentType::Article,
"image" => ContentType::Image,
"video" => ContentType::Video,
"audio" => ContentType::Audio,
_ => ContentType::Article,
},
text_content: r.get("text_content"),
timestamp_start: r.get("timestamp_start"),
timestamp_end: r.get("timestamp_end"),
frame_number: r.get("frame_number"),
similarity: r.get("similarity"),
metadata: r.get("metadata"),
}).collect())
}
}
// HTTP handlers
async fn ingest_handler(
State(rag): State<MultiModalRAG>,
Json(request): Json<IngestMediaRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let id = rag.ingest(request).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
Ok(Json(serde_json::json!({"id": id, "status": "indexed"})))
}
async fn search_handler(
State(rag): State<MultiModalRAG>,
Json(request): Json<SearchRequest>,
) -> Result<Json<Vec<MediaChunk>>, (StatusCode, String)> {
rag.search(request).await
.map(Json)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))
}
pub fn create_router(rag: MultiModalRAG) -> Router {
Router::new()
.route("/ingest", post(ingest_handler))
.route("/search", post(search_handler))
.with_state(rag)
}
Service Architecture:
┌─────────────────────────────────────────────────────────────┐
│ Media Search Frontend │
├─────────────────────────────────────────────────────────────┤
│ Query: "sunset beach vacation photos" │
├─────────────────────────────────────────────────────────────┤
│ MultiModalRAG Service (Axum) │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Articles │ │ Images │ │ Videos │ │
│ │ (Full text) │ │ (Captions) │ │ (Transcripts)│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ Unified HNSW Index │
├─────────────────────────────────────────────────────────────┤
│ HeliosDB-Lite (In-Process) │
└─────────────────────────────────────────────────────────────┘
Results: - Cross-modal search: <15ms P99 - Index 500K mixed media items: 8 hours - Unified ranking across content types - Memory: 1.5GB for 500K items
Example 5: Offline-First RAG - Edge Computing & IoT Deployment¶
Scenario: Field service application needs RAG over equipment manuals and troubleshooting guides, working entirely offline on tablets.
Edge Device Configuration:
[database]
path = "/data/field_rag.db"
memory_limit_mb = 256
page_size = 4096
enable_wal = true
[vector_search]
enabled = true
default_dimensions = 384 # MiniLM for edge
index_type = "hnsw"
ef_construction = 100
m = 12
ef_search = 50 # Lower for faster edge queries
[rag]
enabled = true
chunk_size = 256 # Smaller chunks for edge
chunk_overlap = 25
max_results = 5
[offline]
enabled = true
sync_on_connect = true
compress_storage = true
Edge RAG Implementation:
use heliosdb_lite::Connection;
use std::path::Path;
/// Offline-capable RAG for field service applications
pub struct FieldServiceRAG {
db: Connection,
embedder: MiniLMEmbedder, // Local embedding model
}
impl FieldServiceRAG {
pub fn new(db_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let db = Connection::open(db_path)?;
// Create schema optimized for offline field use
db.execute(
r#"
CREATE TABLE IF NOT EXISTS equipment_manuals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
equipment_model TEXT NOT NULL,
equipment_type TEXT NOT NULL,
manual_version TEXT,
content_hash TEXT,
last_updated DATE,
UNIQUE(equipment_model, manual_version)
)
"#,
[],
)?;
db.execute(
r#"
CREATE TABLE IF NOT EXISTS manual_sections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
manual_id INTEGER REFERENCES equipment_manuals(id),
section_title TEXT NOT NULL,
section_type TEXT, -- 'troubleshooting', 'maintenance', 'specs', 'safety'
content TEXT NOT NULL,
embedding VECTOR(384),
page_number INTEGER,
UNIQUE(manual_id, section_title)
)
"#,
[],
)?;
// Lightweight HNSW for edge
db.execute(
"CREATE INDEX IF NOT EXISTS idx_manual_sections_embed
ON manual_sections USING hnsw (embedding vector_l2_ops)
WITH (m = 12, ef_construction = 100)",
[],
)?;
// Equipment type index for filtering
db.execute(
"CREATE INDEX IF NOT EXISTS idx_equipment_type
ON equipment_manuals (equipment_type)",
[],
)?;
// Load local MiniLM model
let embedder = MiniLMEmbedder::load_local("/data/models/minilm-l6")?;
Ok(FieldServiceRAG { db, embedder })
}
/// Search manuals offline using local embeddings
pub fn troubleshoot(
&self,
problem_description: &str,
equipment_type: Option<&str>,
equipment_model: Option<&str>,
) -> Result<Vec<TroubleshootingResult>, String> {
// Generate embedding locally (no network needed)
let query_embedding = self.embedder.embed(problem_description)?;
let mut conditions = vec!["s.embedding IS NOT NULL".to_string()];
let mut params: Vec<String> = vec![];
// Add equipment filter if specified
if let Some(eq_type) = equipment_type {
conditions.push("m.equipment_type = ?".to_string());
params.push(eq_type.to_string());
}
if let Some(model) = equipment_model {
conditions.push("m.equipment_model = ?".to_string());
params.push(model.to_string());
}
// Prioritize troubleshooting sections
let query = format!(
r#"
SELECT
s.id,
s.section_title,
s.section_type,
s.content,
s.page_number,
m.equipment_model,
m.equipment_type,
1 - (s.embedding <-> ?) as similarity,
CASE WHEN s.section_type = 'troubleshooting' THEN 1.2 ELSE 1.0 END as boost
FROM manual_sections s
JOIN equipment_manuals m ON s.manual_id = m.id
WHERE {}
ORDER BY (1 - (s.embedding <-> ?)) * boost DESC
LIMIT 5
"#,
conditions.join(" AND ")
);
let all_params: Vec<&str> = std::iter::once(query_embedding.as_slice())
.chain(params.iter().map(|s| s.as_str()))
.chain(std::iter::once(query_embedding.as_slice()))
.collect();
let results = self.db.query(&query, &all_params)
.map_err(|e| e.to_string())?;
Ok(results.iter().map(|r| TroubleshootingResult {
section_title: r.get("section_title"),
content: r.get("content"),
equipment_model: r.get("equipment_model"),
page_number: r.get("page_number"),
relevance_score: r.get::<f32>("similarity") * r.get::<f32>("boost"),
}).collect())
}
/// Sync new manuals when connectivity available
pub async fn sync_manuals(&self, server_url: &str) -> Result<SyncResult, String> {
// Check for updated manuals
let local_versions = self.db.query(
"SELECT equipment_model, manual_version, content_hash FROM equipment_manuals",
&[],
).map_err(|e| e.to_string())?;
// Fetch updates from server (when online)
let client = reqwest::Client::new();
let response = client.post(&format!("{}/api/sync/manuals", server_url))
.json(&local_versions)
.send()
.await
.map_err(|e| e.to_string())?;
let updates: Vec<ManualUpdate> = response.json().await
.map_err(|e| e.to_string())?;
let mut synced = 0;
for update in updates {
self.ingest_manual(
&update.equipment_model,
&update.equipment_type,
&update.version,
&update.sections,
)?;
synced += 1;
}
Ok(SyncResult {
manuals_synced: synced,
timestamp: chrono::Utc::now(),
})
}
/// Ingest a manual with automatic chunking
fn ingest_manual(
&self,
equipment_model: &str,
equipment_type: &str,
version: &str,
sections: &[ManualSection],
) -> Result<(), String> {
// Insert/update manual record
let manual_id: i64 = self.db.query_one(
r#"
INSERT INTO equipment_manuals (equipment_model, equipment_type, manual_version, last_updated)
VALUES (?, ?, ?, date('now'))
ON CONFLICT (equipment_model, manual_version) DO UPDATE
SET last_updated = date('now')
RETURNING id
"#,
&[equipment_model, equipment_type, version],
).map_err(|e| e.to_string())?.get("id");
// Delete old sections
self.db.execute(
"DELETE FROM manual_sections WHERE manual_id = ?",
&[&manual_id.to_string()],
).map_err(|e| e.to_string())?;
// Insert new sections with embeddings
for section in sections {
let embedding = self.embedder.embed(§ion.content)?;
self.db.execute(
r#"
INSERT INTO manual_sections
(manual_id, section_title, section_type, content, embedding, page_number)
VALUES (?, ?, ?, ?, ?, ?)
"#,
&[
&manual_id.to_string(),
§ion.title,
§ion.section_type,
§ion.content,
&embedding,
§ion.page_number.to_string(),
],
).map_err(|e| e.to_string())?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct TroubleshootingResult {
pub section_title: String,
pub content: String,
pub equipment_model: String,
pub page_number: i32,
pub relevance_score: f32,
}
#[derive(Debug)]
pub struct SyncResult {
pub manuals_synced: usize,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
// Mobile app usage
fn main() {
let rag = FieldServiceRAG::new("/data/field_rag.db").unwrap();
// Technician searches for troubleshooting help (works offline)
let results = rag.troubleshoot(
"Pump not starting, motor humming but no rotation",
Some("pump"),
Some("XP-5000"),
).unwrap();
for result in results {
println!("=== {} (p.{}) ===", result.section_title, result.page_number);
println!("{}", result.content);
println!("Relevance: {:.2}", result.relevance_score);
}
}
Edge Architecture:
┌───────────────────────────────────┐
│ Field Technician Tablet │
├───────────────────────────────────┤
│ Mobile App (Flutter/React Native)│
├───────────────────────────────────┤
│ FieldServiceRAG (Rust Library) │
│ - Local MiniLM embeddings │
│ - Offline search │
├───────────────────────────────────┤
│ HeliosDB-Lite (Embedded) │
│ - 256MB memory limit │
│ - 5000 manual sections │
├───────────────────────────────────┤
│ Occasional Sync (WiFi/4G) │
└───────────────────────────────────┘
Results: - Offline search: <50ms (including local embedding) - Storage: 150MB for 5000 manual sections - Battery impact: Minimal (no network) - Sync time: 2-5 minutes for full update - Works in airplane mode, basements, remote sites
Market Audience¶
Primary Segments¶
Segment 1: AI Application Startups¶
| Attribute | Details |
|---|---|
| Company Size | 5-100 employees |
| Industry | SaaS, AI/ML, Developer Tools |
| Pain Points | Vector DB costs, latency, infrastructure complexity |
| Decision Makers | CTO, Founding Engineer, ML Lead |
| Budget Range | $10K-$100K annual infrastructure |
| Deployment Model | Embedded / Serverless / Container |
Value Proposition: Launch RAG features in days, not weeks, with zero infrastructure cost.
Segment 2: Enterprise Knowledge Management¶
| Attribute | Details |
|---|---|
| Company Size | 1,000-50,000 employees |
| Industry | Finance, Healthcare, Legal, Manufacturing |
| Pain Points | Data sovereignty, compliance, access control |
| Decision Makers | VP Engineering, Chief Data Officer |
| Budget Range | $500K-$5M annual AI budget |
| Deployment Model | On-premise / Private cloud |
Value Proposition: Compliant RAG infrastructure with fine-grained access control and audit trails.
Segment 3: Field Operations & Edge¶
| Attribute | Details |
|---|---|
| Company Size | 100-5,000 employees |
| Industry | Utilities, Telecom, Oil & Gas, Field Service |
| Pain Points | Connectivity gaps, rugged environments, real-time answers |
| Decision Makers | VP Operations, Field Systems Manager |
| Budget Range | $50K-$500K per deployment |
| Deployment Model | Edge devices / Tablets / Embedded |
Value Proposition: Full RAG capabilities in disconnected environments with 50ms search latency.
Buyer Personas¶
| Persona | Title | Pain Point | Buying Trigger | Message |
|---|---|---|---|---|
| RAG Rachel | ML Engineer | 200ms retrieval killing UX | User complaints about slow answers | "5ms retrieval, 10x faster RAG responses" |
| Compliance Carl | Security Architect | Can't use cloud for sensitive docs | Audit finding / regulation | "On-premise RAG with access control" |
| Field Frank | Operations Director | Technicians can't search manuals offline | Productivity loss in remote areas | "Offline RAG works anywhere" |
Technical Advantages¶
Why HeliosDB-Lite Excels¶
| Aspect | HeliosDB-Lite | Pinecone | pgvector | ChromaDB |
|---|---|---|---|---|
| Retrieval Latency | <10ms | 50-150ms | 20-50ms | 15-30ms |
| Hybrid Search | Native SQL + Vector | Metadata only | SQL + Vector | Limited filters |
| Deployment | Single file | Cloud only | PostgreSQL | Python server |
| Offline Support | Full | None | None | Limited |
| Cost at 10M chunks | $0 | $500+/mo | Server cost | Self-hosted |
Performance Characteristics¶
| Operation | Throughput | Latency (P99) | Memory |
|---|---|---|---|
| Document Ingestion | 10K chunks/sec | 50ms batch | Minimal |
| Semantic Search (1M) | 10K qps | 12ms | ~500MB index |
| Hybrid Query | 5K qps | 20ms | Minimal overhead |
| Re-ranking (100 docs) | 2K qps | 25ms | Minimal |
Adoption Strategy¶
Phase 1: Proof of Concept (Weeks 1-4)¶
Target: Validate RAG quality and performance
Tactics: - Migrate existing retrieval to HeliosDB-Lite - Benchmark latency vs current solution - A/B test answer quality
Success Metrics: - Retrieval latency < 20ms P99 - Answer quality maintained or improved - Developer experience positive
Phase 2: Pilot Deployment (Weeks 5-12)¶
Target: Production validation with subset
Tactics: - Deploy to 10% of RAG traffic - Monitor retrieval accuracy metrics - Gather user feedback on response quality
Success Metrics: - 99.9% availability - Retrieval accuracy > 90% Recall@10 - User satisfaction maintained
Phase 3: Full Rollout (Weeks 13+)¶
Target: Complete migration
Tactics: - Gradual traffic migration - Decommission external vector database - Document operational procedures
Success Metrics: - 100% traffic migrated - Infrastructure cost reduced 80%+ - End-to-end latency improved 50%+
Key Success Metrics¶
Technical KPIs¶
| Metric | Target | Measurement Method |
|---|---|---|
| Retrieval P99 latency | < 20ms | Application metrics |
| Recall@10 | > 90% | Evaluation dataset |
| Chunk indexing throughput | > 5K/sec | Ingestion benchmarks |
Business KPIs¶
| Metric | Target | Measurement Method |
|---|---|---|
| Infrastructure cost reduction | > 80% | Cloud billing |
| End-to-end response improvement | > 40% | User latency tracking |
| Development velocity increase | > 50% | Sprint velocity |
Conclusion¶
RAG systems are foundational to modern AI applications, from chatbots to enterprise search. The retrieval bottleneck - fetching relevant context from external vector databases - adds unacceptable latency and operational complexity. Most organizations cobble together multiple systems (vector DB + SQL + cache) just to achieve basic hybrid search.
HeliosDB-Lite solves this with a unified embedded solution: HNSW vector search combined with full SQL capabilities in a single, zero-dependency database. By running entirely in-process, retrieval latency drops from 150ms to 10ms. By embedding directly in applications, infrastructure costs drop from $500+/month to zero.
The market opportunity spans every AI application requiring contextual retrieval: from startup chatbots to enterprise knowledge bases to offline field service tools. Teams adopting HeliosDB-Lite gain faster time-to-market, lower costs, and superior performance - the competitive trifecta for AI product development.
References¶
- "Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks" (Lewis et al., 2020)
- Vector Database Benchmark Study (ANN-Benchmarks): https://ann-benchmarks.com/
- "Hybrid Search: Combining Keyword and Vector Search" (Weaviate Blog, 2023)
- Enterprise RAG Architecture Patterns (Google Cloud, 2024)
Document Classification: Business Confidential Review Cycle: Quarterly Owner: Product Marketing Adapted for: HeliosDB-Lite Embedded Database