Skip to content

d-carmo/basic_rag

Repository files navigation

RAG Pipeline

A production-grade Retrieval-Augmented Generation pipeline built from scratch in Python — no LangChain, no LlamaIndex. Every stage is independently swappable, fully async, and deployed as three independent Docker services.

What it does

The pipeline ingests documents (PDF, DOCX, HTML, Markdown, JSON, CSV, and live web pages), chunks and enriches them, embeds them into a Qdrant vector store, and answers natural-language queries with grounded, cited responses.

Documents / URLs → [Ingest Service :8002] → Qdrant ←→ [Query Service :8001] → Answer + Citations
                                               ↑
                   [Crawler Service :8003] ───→┘  (delegates to ingest via HTTP)

Features

  • Document loaders — PDF (with OCR fallback), DOCX, HTML, Markdown, JSON/JSONL, CSV, and live URLs
  • Web crawling — fetch a single URL or crawl an entire site; same-domain link discovery, sequential background processing with configurable delay, SHA-256 content-hash deduplication (skip unchanged pages on re-crawl), Redis-backed work queue
  • Chunking strategies — recursive, sentence-aware, section/header, semantic, parent-child
  • Enrichment — language detection, NER (spaCy), hypothetical questions, title extraction
  • Embeddings — local (BGE-M3 via sentence-transformers), OpenAI, Cohere, BM25 sparse
  • Retrieval — dense, hybrid (dense + sparse), HyDE, multi-query, step-back; cross-encoder and Cohere reranking; RRF merging
  • Context assembly — near-duplicate filtering, lost-in-the-middle reordering, token budget trimming, citation maps
  • LLM generation — Anthropic Claude (default), OpenAI, or any local model via OpenAI-compatible API (Ollama, LM Studio, vLLM); prompt injection guard; per-request faithfulness scoring (word-overlap heuristic or NLI cross-encoder)
  • REST API — FastAPI with streaming SSE, API-key auth, per-key rate limiting, CORS
  • Three-service deployment — query (8001), ingest (8002), and crawler (8003) run as independent Docker containers; each scales independently
  • Evaluation — two scoring backends: custom token-overlap heuristics (zero dependencies) or RAGAS 0.2+ LLM-as-judge; golden dataset + baseline regression
  • Observability — structlog (JSON/pretty), Prometheus metrics per service, Grafana dashboard (19 panels across 5 sections)

Quick start

Prerequisites: Python 3.12+, Docker, uv or pip.

1. Clone and install

git clone <repo-url> && cd RAG_pipeline
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"

2. Configure environment

cp .env.example .env
# Edit .env — set LLM_BACKEND and the relevant API key or local server URL

Key variables:

Variable Default Description
API_KEY dev-key Comma-separated valid API keys for X-API-Key header
QDRANT_URL http://localhost:6333 Qdrant endpoint
LLM_BACKEND anthropic anthropic / openai / local
ANTHROPIC_API_KEY Required when LLM_BACKEND=anthropic
LLM_BASE_URL Required when LLM_BACKEND=local (e.g. http://localhost:11434/v1)
LLM_MODEL claude-opus-4-8 Model name passed to the LLM backend
LLM_MAX_TOKENS 2048 Maximum tokens in LLM response
LLM_TIMEOUT 600 HTTP timeout in seconds for LLM calls — increase for slow local models
EMBEDDER_BACKEND sentence_transformer sentence_transformer / openai / cohere
EMBEDDER_MODEL BAAI/bge-m3 Embedding model name
COLLECTION_NAME rag Qdrant collection name
LOG_LEVEL INFO DEBUG / INFO / WARNING
REDIS_URL Redis endpoint — required for URL dedup and crawl queue (redis://localhost:6379/0)
INGEST_SERVICE_URL Crawler: URL of the ingest service (http://ingest:8002 in Docker Compose, set automatically)
INGEST_SERVICE_API_KEY API key the crawler uses when calling the ingest service
FAITHFULNESS_BACKEND off Per-request faithfulness scoring: off / heuristic / nli
NLI_MODEL cross-encoder/nli-deberta-v3-small NLI cross-encoder model (used when FAITHFULNESS_BACKEND=nli). Uses per-chunk scoring to avoid DeBERTa's 512-token overflow.
NLI_THRESHOLD 0.3 Entailment probability threshold for NLI scoring (0–1). Calibrated for RAG paraphrasing, which produces lower scores than verbatim quotation.
EVAL_METRICS_BACKEND custom Offline eval scoring: custom (token-overlap) or ragas (LLM-as-judge)

3. Start services

docker compose up --build

This starts all infrastructure and the three application services:

Container Port Purpose
qdrant 6333 Vector store
redis 6379 Crawl queue + content-hash dedup
prometheus 9090 Metrics collection
grafana 3000 Dashboards
query 8001 Search + answer generation (read-only Qdrant)
ingest 8002 Chunk / embed / upsert (write Qdrant)
crawler 8003 Crawl queue management (delegates URL ingestion to ingest)

4. Verify startup

curl http://localhost:8001/health   # query service   → {"status": "ok"}
curl http://localhost:8002/health   # ingest service  → {"status": "ok"}
curl http://localhost:8003/health   # crawler service → {"status": "ok"}

For local hot-reload development (outside Docker), start only the infrastructure containers, then run the services individually:

docker compose up -d qdrant redis prometheus grafana

uvicorn rag.api.query_app:app   --host 0.0.0.0 --port 8001 --reload
uvicorn rag.api.ingest_app:app  --host 0.0.0.0 --port 8002 --reload
uvicorn rag.api.crawler_app:app --host 0.0.0.0 --port 8003 --reload

Interactive docs: http://localhost:8001/docs (query), http://localhost:8002/docs (ingest), http://localhost:8003/docs (crawler).

5. Run tests

pytest

Integration tests (require a live Qdrant) are marked @pytest.mark.integration and skipped by default. To run only unit tests: pytest -m "not integration".


API usage

All endpoints (except /health, /ready, and /metrics) require X-API-Key: <your-key>.

Service Port Endpoints
Ingest 8002 /v1/ingest, /v1/ingest/batch, /v1/ingest/file, /v1/ingest/url, /v1/sources, DELETE /v1/source/:id
Query 8001 /v1/query, /v1/sources
Crawler 8003 /v1/crawl/start, /v1/crawl/process, /v1/crawl/status/:domain

Ingest raw text

curl -X POST http://localhost:8002/v1/ingest \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{
    "text": "Retrieval-Augmented Generation combines a retrieval system with a language model...",
    "source_id": "doc-rag-intro",
    "doc_type": "text",
    "title": "Introduction to RAG"
  }'
{"source_id": "doc-rag-intro", "status": "ok", "chunk_count": 1}

Ingest a file (PDF or plain text)

Upload a file directly — the server extracts the text using the appropriate loader.

# Plain text or Markdown
curl -X POST http://localhost:8002/v1/ingest/file \
  -H "X-API-Key: changeme" \
  -F "file=@/path/to/document.txt" \
  -F "source_id=my-doc-001" \
  -F "title=My Document"

# PDF (one chunk per page)
curl -X POST http://localhost:8002/v1/ingest/file \
  -H "X-API-Key: changeme" \
  -F "file=@/path/to/document.pdf" \
  -F "source_id=my-doc-001" \
  -F "title=My Document"

Supported extensions: .pdf, .txt, .text, .rst, .md, .markdown. source_id and title are optional — source_id defaults to the filename.

Batch ingest

curl -X POST http://localhost:8002/v1/ingest/batch \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{
    "documents": [
      {"text": "Dense retrieval embeds queries and documents...", "source_id": "doc-dense"},
      {"text": "Sparse retrieval uses term weights like BM25...", "source_id": "doc-sparse"}
    ]
  }'

Ingest a URL

Fetch a web page and ingest its text. The URL is stored as source_id, so re-ingesting later updates the content. With REDIS_URL set, unchanged pages are automatically skipped (SHA-256 content-hash comparison).

curl -X POST http://localhost:8002/v1/ingest/url \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{"url": "https://docs.example.com/intro"}'
{"source_id": "https://docs.example.com/intro", "status": "ok", "chunk_count": 1}

Re-ingesting the same URL returns "status": "skipped" when the page content is unchanged. Add "force": true to bypass.

Crawl a website

Discover all same-domain links from a seed URL, enqueue them in Redis, and process them sequentially in the background. The crawler service delegates each URL to the ingest service via HTTP (INGEST_SERVICE_URL). Requires REDIS_URL.

1. Start the crawl — returns immediately, background task begins:

curl -X POST http://localhost:8003/v1/crawl/start \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{"url": "https://docs.example.com/", "max_pages": 50, "delay_seconds": 1.0}'
{"domain": "docs.example.com", "pending_count": 43, "crawling": true}

2. Monitor progress:

curl http://localhost:8003/v1/crawl/status/docs.example.com -H "X-API-Key: changeme"
{"domain": "docs.example.com", "pending": 31, "processing": 1, "visited": 11, "active": true}

When active is false and pending is 0, the crawl is complete. Redis queue state (pending / processing / seen / visited) is cleaned up automatically; content hashes are kept so the next crawl can skip unchanged pages.

3. Manual step-through (useful for debugging):

curl -X POST http://localhost:8003/v1/crawl/process \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{"domain": "docs.example.com"}'
{"url": "https://docs.example.com/api", "status": "ok", "chunk_count": 1, "error": null}

Possible status values: ok, skipped (content unchanged), empty (no text extracted), error (fetch failed — URL re-queued for retry), done (queue empty).


Query

curl -X POST http://localhost:8001/v1/query \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{"query": "How does dense retrieval work?", "top_k": 5}'
{
  "answer": "Dense retrieval embeds both the query and documents into a shared vector space...",
  "sources": [
    {"citation_number": 1, "source_id": "doc-dense", "title": null, "score": 0.92}
  ],
  "trace_id": "3f2a...",
  "faithfulness_score": 0.83,
  "truncated": false
}

faithfulness_score is null when FAITHFULNESS_BACKEND=off (the default). Set it to heuristic or nli in .env to get a score between 0 and 1 on every response.

Streaming query (SSE)

curl -X POST http://localhost:8001/v1/query \
  -H "X-API-Key: changeme" \
  -H "Content-Type: application/json" \
  -d '{"query": "Explain RAG", "stream": true}'

Each line is a Server-Sent Event:

data: {"chunk": "Retrieval-Augmented"}
data: {"chunk": " Generation combines"}
...
data: {"done": true, "trace_id": "3f2a..."}

List sources

curl http://localhost:8001/v1/sources -H "X-API-Key: dev-key"
{
  "sources": [
    {"source_id": "my-doc-001", "chunk_count": 12, "ingested_at": "2026-06-10T14:23:01.123456"}
  ],
  "total": 1
}

ingested_at is the ISO 8601 timestamp of the earliest chunk written for that source (read from Qdrant created_at payload).

Delete a source

curl -X DELETE http://localhost:8002/v1/source/doc-rag-intro -H "X-API-Key: dev-key"

Health and readiness checks

curl http://localhost:8001/health   # {"status": "ok"}
curl http://localhost:8001/ready    # {"status": "ok", "checks": {"qdrant": true}}
curl http://localhost:8002/ready    # {"status": "ok", "checks": {"qdrant": true, "redis": true}}
curl http://localhost:8003/ready    # {"status": "ok", "checks": {"redis": true}}

Each service reports only its own dependencies in /ready.

Prometheus metrics

curl http://localhost:8001/metrics  # query service metrics — no auth required
curl http://localhost:8002/metrics  # ingest service metrics
curl http://localhost:8003/metrics  # crawler service metrics

Using a local LLM

Point the pipeline at any OpenAI-compatible server — no API key needed:

Set these in .env (or inline):

LLM_BACKEND=local
LLM_BASE_URL=http://localhost:11434/v1   # Ollama
# LLM_BASE_URL=http://localhost:1234/v1  # LM Studio
# LLM_BASE_URL=http://localhost:8000/v1  # vLLM
LLM_MODEL=llama3
LLM_TIMEOUT=600   # increase if your model is slow to respond

Running evaluation

Two scoring backends are available, selectable via --metrics-backend.

Custom (default — no extra dependencies)

Uses token-overlap heuristics: token-F1 for answer similarity, set-overlap for context recall/precision, word-overlap for faithfulness.

python -m rag.eval run \
  --dataset eval/golden.jsonl \
  --report eval/results/report.json \
  --baseline eval/baseline.json

RAGAS (LLM-as-judge)

Uses RAGAS 0.2+ for all four metrics. Each metric calls an LLM judge — by default OpenAI (OPENAI_API_KEY must be set). Requires the eval extras.

pip install -e ".[eval]"

python -m rag.eval run \
  --dataset eval/golden.jsonl \
  --report eval/results/report.json \
  --metrics-backend ragas

Output

Evaluation complete: eval/results/report.json
  answer_similarity: 0.7200
  context_recall:    0.8500
  context_precision: 0.7800
  faithfulness:      0.6900

Comparison vs baseline:
  answer_similarity: 0.7200 (+0.0300)
  ...

The golden dataset lives in eval/golden.jsonl. Add Q&A triples there and commit eval/baseline.json after a successful run to set a regression baseline.


Observability

Prometheus

Prometheus scrapes all three services' /metrics endpoints and is available at http://localhost:9090 after docker compose up. Scrape targets are pre-configured in docker/prometheus/prometheus.yml as three jobs: rag-query, rag-ingest, rag-crawler.

Time-series data is stored in the prometheus_data Docker named volume, mounted at /prometheus inside the container. Data persists across container restarts. To inspect the volume path on disk:

docker volume inspect rag_pipeline_prometheus_data

Useful queries to get started:

What PromQL
Query rate (req/min) rate(http_requests_total{path="/v1/query"}[1m]) * 60
p99 query latency histogram_quantile(0.99, rate(http_request_duration_seconds_bucket{path="/v1/query"}[5m]))
Total chunks ingested sum(chunks_ingested_total)
Retrieval p99 latency histogram_quantile(0.99, rate(rag_retrieval_duration_seconds_bucket[5m]))
Faithfulness p50 histogram_quantile(0.5, rate(rag_faithfulness_score_bucket[5m]))
Crawl queue depth rag_crawl_queue_pending{domain="example.com"}
5xx error rate by service rate(http_requests_total{status=~"5.."}[1m]) * 60

Open the expression browser at http://localhost:9090/graph, type a query, and click Execute.

To check which targets are being scraped: http://localhost:9090/targets

Grafana

Grafana is available at http://localhost:3000.

Default credentials: admin / admin (you will be prompted to change the password on first login).

The RAG Pipeline dashboard is pre-provisioned and appears under Dashboards → RAG automatically. It has 19 panels across 5 sections:

Section Panels
Summary Query rate, query p99 latency, total chunks in store, 5xx error rate
Query Service Latency p50/p95/p99 over time, requests by status code
Ingest Service Chunks ingested per minute by route, endpoint latency p95
Crawler Service Crawl endpoint request rates, per-service traffic overview
RAG Quality Faithfulness p50/p95, retrieval latency p50/p95/p99, crawl queue depth by domain

To find it manually: Dashboards (left sidebar) → search for RAG Pipeline.

The Prometheus datasource is also pre-provisioned — no manual connection setup is required. To change the default admin password, set it in .env before starting:

GRAFANA_ADMIN_PASSWORD=mysecretpassword

Production deployment

Each service builds from a single shared Docker image. To start everything:

docker compose up --build -d

At minimum, set these in .env before starting:

  • API_KEY — a strong random key
  • ANTHROPIC_API_KEY (or your LLM credentials)
  • REDIS_URL=redis://redis:6379/0
  • QDRANT_URL=http://qdrant:6333
  • GRAFANA_ADMIN_PASSWORD

The crawler service calls the ingest service for each crawled URL. INGEST_SERVICE_URL=http://ingest:8002 is set automatically by docker-compose.yml. Add resource limits and restart: always to the compose services as needed.


Scalability

The three services share only stateless infrastructure (Qdrant + Redis) — no shared in-process state. Each can be scaled independently:

Bottleneck Before split After split
Query latency under ingest load Embedder and query compete for CPU Query service has no embedder — ingest scales independently
Crawl speed 1 background task per process Multiple crawler pods can own disjoint domains
Embedding throughput CPU-bound, one model per process Ingest service scales to GPU nodes independently
Crawl task durability active_crawls in-memory, lost on restart Crawler delegates via HTTP; 5xx responses trigger queue retry

Kubernetes topology

Deployment: query-service     # 2–20 replicas, HPA on CPU + latency
Deployment: ingest-service    # 1–5 replicas, HPA on CPU
Deployment: crawler-service   # 1–N replicas, manual or per-domain

StatefulSet: qdrant           # 1–3 replicas with PVC
StatefulSet: redis            # 1 replica (Redis Sentinel for HA)

See ARCHITECTURE.md for the full improvement roadmap, including durable task queues (Redis Streams / Celery), query result caching, per-tenant collections, and an async NLI sidecar.


Project structure

src/rag/
├── api/
│   ├── query_app.py    # Query service entry point (port 8001)
│   ├── ingest_app.py   # Ingest service entry point (port 8002)
│   ├── crawler_app.py  # Crawler service entry point (port 8003)
│   ├── api_metrics.py  # Prometheus metric definitions (counters, histograms, gauges)
│   ├── config.py       # Settings (from_env)
│   ├── deps.py         # FastAPI dependency injection helpers
│   ├── middleware.py   # MetricsMiddleware, auth, rate limiting
│   └── routes/
│       ├── admin.py      # /health, /ready, /metrics, /sources, DELETE /source
│       ├── crawl.py      # /crawl/start, /crawl/process, /crawl/status
│       ├── ingest.py     # /ingest, /ingest/batch
│       ├── query.py      # /query (JSON + SSE stream)
│       ├── upload.py     # /ingest/file
│       └── url_ingest.py # /ingest/url (shared by ingest + crawler services)
├── assembler/      # Context assembly: dedup, reorder, budget, citations
├── chunker/        # Chunking strategies (recursive, sentence, semantic, …)
├── crawler/        # Site crawl: Redis work queue, link extractor
├── embedder/       # Dense + sparse embedders, factory
├── enricher/       # Language, NER, hypothetical questions, title
├── eval/           # Evaluation: dataset, metrics, runner, CLI
├── generation/     # LLM backends, prompts, guard, faithfulness, pipeline
├── ingestion/      # IngestionLog (SQLite)
├── loaders/        # Document loaders (PDF, DOCX, HTML, URL, …)
├── observability/  # Logging, tracing
├── retriever/      # Retrievers, transforms, rerankers, pipeline
└── vector_store/   # Qdrant store, schema, filters, snapshots
tests/              # Unit tests mirroring src/rag/
eval/               # golden.jsonl, baseline.json
docker/             # Dockerfile, Grafana provisioning, Prometheus config

Tech stack

Layer Library
Language Python 3.12
API FastAPI + uvicorn (three independent services)
Vector store Qdrant
Embeddings sentence-transformers, OpenAI, Cohere
LLM Anthropic Claude (default), OpenAI-compatible
Crawl queue / dedup Redis (LIST + HASH + SET for queue; STRING for content hashes)
Observability structlog, Prometheus, OpenTelemetry, Grafana
Testing pytest, pytest-asyncio, httpx
Linting ruff, mypy (strict)

About

A RAG pipeline I'm creating to learn more about it

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages