From e63657aa9122e236dc19b11858a83513c2471507 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Thu, 18 Jun 2026 20:49:06 +0800 Subject: [PATCH 1/7] feat: add ingestion mutation recovery helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add openkb/mutation.py, the transactional layer for KB mutations, plus tests. MutationSnapshot snapshots the target paths, journals the intent, and restores on failure across active/committed/rolled_back states (children restored before parents; self-cleans its backup dir on partial failure). snapshot_paths writes the active journal as the recovery signal; recover_pending_journals rolls back active journals and discards terminal ones. publish_staged_tree copies staged raw/source artifacts into place. Module and tests only — wired into the add path in the next commit. --- openkb/mutation.py | 216 +++++++++++++++++++++++++++++++++++++++++ tests/test_mutation.py | 78 +++++++++++++++ 2 files changed, 294 insertions(+) create mode 100644 openkb/mutation.py create mode 100644 tests/test_mutation.py diff --git a/openkb/mutation.py b/openkb/mutation.py new file mode 100644 index 00000000..cdac260b --- /dev/null +++ b/openkb/mutation.py @@ -0,0 +1,216 @@ +"""Transactional helpers for KB mutation paths.""" +from __future__ import annotations + +import json +import logging +import shutil +import uuid +from dataclasses import dataclass, field +from pathlib import Path + +from openkb.locks import atomic_write_bytes, atomic_write_json + +logger = logging.getLogger(__name__) + + +def _copy_file_atomic(src: Path, dest: Path) -> None: + dest.parent.mkdir(parents=True, exist_ok=True) + atomic_write_bytes(dest, src.read_bytes()) + + +@dataclass +class MutationSnapshot: + """Snapshot of final KB paths touched by a mutation attempt.""" + + kb_dir: Path + backup_dir: Path + journal_path: Path + operation: str + details: dict = field(default_factory=dict) + entries: dict[Path, Path | None] = field(default_factory=dict) + + def _journal_data(self, status: str) -> dict: + return { + "version": 1, + "operation": self.operation, + "status": status, + "kb_dir": str(self.kb_dir), + "backup_dir": str(self.backup_dir), + "details": self.details, + "entries": [ + { + "target": str(target), + "backup": str(backup) if backup is not None else None, + } + for target, backup in self.entries.items() + ], + } + + def write_journal(self, status: str) -> None: + self.journal_path.parent.mkdir(parents=True, exist_ok=True) + atomic_write_json(self.journal_path, self._journal_data(status)) + + def mark_committed(self) -> None: + """Mark the journal committed without removing the backup. + + Call this the instant the mutation is durably applied (e.g. the + registry write has landed) so a subsequent + :func:`recover_pending_journals` discards the journal instead of + rolling it back. This is the commit signal; :meth:`discard` is the + post-commit cleanup that also removes the backup dir and journal + file and must itself be best-effort — it runs *after* the commit + point and its failure must never trigger a rollback. + """ + self.write_journal("committed") + + def rollback(self) -> None: + # Restore children before parents so directory deletes cannot remove + # paths that still need to be restored from a more specific backup. + for target, backup in sorted( + self.entries.items(), + key=lambda item: len(item[0].parts), + reverse=True, + ): + # Removal is unconditional; the backup (if any) is then restored + # in its place. + if target.is_dir(): + shutil.rmtree(target, ignore_errors=True) + else: + target.unlink(missing_ok=True) + if backup is None: + continue + target.parent.mkdir(parents=True, exist_ok=True) + if backup.is_dir(): + shutil.copytree(backup, target) + else: + _copy_file_atomic(backup, target) + self.write_journal("rolled_back") + + def rollback_best_effort(self) -> Exception | None: + try: + self.rollback() + except Exception as exc: + logger.warning("Mutation rollback failed: %s", exc) + return exc + return None + + def discard(self) -> None: + # Best-effort post-commit/post-rollback cleanup: callers have already + # written a terminal status (mark_committed or rollback), so there is + # nothing to re-write here — doing so would be dead work and would + # silently downgrade a "rolled_back" journal to "committed" moments + # before it is deleted. + shutil.rmtree(self.backup_dir, ignore_errors=True) + self.journal_path.unlink(missing_ok=True) + + def discard_best_effort(self) -> Exception | None: + try: + self.discard() + except Exception as exc: + logger.warning("Mutation journal cleanup failed: %s", exc) + return exc + return None + + +def snapshot_paths( + kb_dir: Path, + paths: list[Path], + *, + operation: str, + details: dict | None = None, +) -> MutationSnapshot: + """Snapshot final KB paths before a mutation starts.""" + kb_dir = kb_dir.resolve() + journal_id = uuid.uuid4().hex + backup_dir = kb_dir / ".openkb" / "staging" / f"rollback-{journal_id}" + backup_dir.mkdir(parents=True, exist_ok=False) + snapshot = MutationSnapshot( + kb_dir=kb_dir, + backup_dir=backup_dir, + journal_path=kb_dir / ".openkb" / "journal" / f"{journal_id}.json", + operation=operation, + details=details or {}, + ) + try: + for path in paths: + target = path.resolve() + if target in snapshot.entries: + continue + if not target.exists(): + snapshot.entries[target] = None + continue + rel = target.relative_to(kb_dir) + backup = backup_dir / rel + backup.parent.mkdir(parents=True, exist_ok=True) + if target.is_dir(): + shutil.copytree(target, backup) + else: + shutil.copy2(target, backup) + snapshot.entries[target] = backup + # The active journal is the recovery signal: once this exists, a future + # process can restore every recorded target even if the current one exits. + snapshot.write_journal("active") + except Exception: + # Partial snapshot: backup_dir exists on disk but no journal was + # written. recover_pending_journals only scans journals, so remove the + # orphan backup here — otherwise it leaks forever with nothing able to + # reach or clean it. + shutil.rmtree(backup_dir, ignore_errors=True) + raise + return snapshot + + +def _snapshot_from_journal(path: Path, data: dict) -> MutationSnapshot: + snapshot = MutationSnapshot( + kb_dir=Path(data["kb_dir"]), + backup_dir=Path(data["backup_dir"]), + journal_path=path, + operation=data.get("operation", "mutation"), + details=data.get("details") or {}, + ) + snapshot.entries = { + Path(item["target"]): Path(item["backup"]) if item.get("backup") else None + for item in data.get("entries", []) + } + return snapshot + + +def recover_pending_journals(kb_dir: Path) -> list[str]: + """Rollback active journals left by an interrupted process.""" + journal_dir = kb_dir / ".openkb" / "journal" + if not journal_dir.is_dir(): + return [] + messages: list[str] = [] + for journal_path in sorted(journal_dir.glob("*.json")): + try: + data = json.loads(journal_path.read_text(encoding="utf-8")) + snapshot = _snapshot_from_journal(journal_path, data) + status = data.get("status", "active") + if status in {"committed", "rolled_back"}: + snapshot.discard() + messages.append(f"Cleaned terminal mutation journal {journal_path.name}.") + continue + snapshot.rollback() + snapshot.discard() + messages.append( + f"Rolled back interrupted {snapshot.operation} journal {journal_path.name}." + ) + except Exception as exc: + messages.append( + f"Could not recover journal {journal_path.name}: {type(exc).__name__}: {exc}" + ) + return messages + + +def publish_staged_tree(staging_dir: Path | None, kb_dir: Path) -> None: + """Copy staged raw/source artifacts into their final KB locations.""" + if staging_dir is None or not staging_dir.exists(): + return + for rel in ("raw", "wiki/sources"): + src_root = staging_dir / rel + if not src_root.exists(): + continue + for src in src_root.rglob("*"): + if not src.is_file(): + continue + _copy_file_atomic(src, kb_dir / rel / src.relative_to(src_root)) diff --git a/tests/test_mutation.py b/tests/test_mutation.py new file mode 100644 index 00000000..c08ff89f --- /dev/null +++ b/tests/test_mutation.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import pytest + +from openkb.mutation import recover_pending_journals, snapshot_paths + + +def test_recover_pending_add_journal_rolls_back_files(tmp_path): + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + new_file = kb_dir / "wiki" / "sources" / "doc.md" + + snapshot_paths( + kb_dir, + [target, new_file], + operation="add", + details={"doc_name": "doc"}, + ) + target.write_text("after", encoding="utf-8") + new_file.parent.mkdir(parents=True) + new_file.write_text("new", encoding="utf-8") + + messages = recover_pending_journals(kb_dir) + + assert any("Rolled back interrupted add journal" in message for message in messages) + assert target.read_text(encoding="utf-8") == "before" + assert not new_file.exists() + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_mark_committed_prevents_recovery_rollback(tmp_path): + """A snapshot marked committed must be discarded (not rolled back) by + recovery — the commit signal that protects a completed mutation from + being undone when post-commit cleanup fails. + """ + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [target], operation="add", details={"doc_name": "doc"} + ) + target.write_text("after", encoding="utf-8") # the "committed" mutation + snapshot.mark_committed() + + messages = recover_pending_journals(kb_dir) + + assert any("Cleaned terminal mutation journal" in m for m in messages) + assert target.read_text(encoding="utf-8") == "after" # NOT rolled back + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path): + """A partially-created snapshot must not leak its backup dir: on any + failure before the journal is written, snapshot_paths removes the + rollback dir it created (recover_pending_journals only scans journals + and could never reach it otherwise). + """ + kb_dir = tmp_path / "kb" + kb_dir.mkdir() + # A target that resolves OUTSIDE kb_dir makes relative_to(kb_dir) raise + # mid-loop, after backup_dir was already mkdir'd. + outside = tmp_path / "outside.txt" + outside.write_text("hi", encoding="utf-8") + + with pytest.raises(ValueError): + snapshot_paths(kb_dir, [outside], operation="add", details={}) + + staging = kb_dir / ".openkb" / "staging" + if staging.exists(): + assert not any(staging.iterdir()) # no orphan rollback- dir From ddd4aee4ffebf0ad040781505b30423e07794209 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Thu, 18 Jun 2026 20:49:22 +0800 Subject: [PATCH 2/7] feat: add concurrent ingestion pipeline Parallelize directory `add`: prepare files concurrently (hash, prefilter, staging, convert) while live-KB mutation stays serialized under the mutation lock. - Split add into prepare (file-local, into an isolated staging dir) and commit (under the lock): snapshot -> publish -> index -> compile -> registry write -> mark_committed, with snapshot rollback on failure. - convert_document gains assume_locked/staging_dir/doc_name_override so parallel prepares never write the live KB unlocked. - Reserve doc_names in scan order via HashRegistry.memory so same-stem files behave like serial adds. - New file_processing_jobs config (default 2). --- README.md | 1 + config.yaml.example | 1 + openkb/cli.py | 539 ++++++++++++++++++++++++++++++-------- openkb/config.py | 1 + openkb/converter.py | 59 ++++- openkb/state.py | 19 ++ tests/test_add_command.py | 419 ++++++++++++++++++++++++++++- 7 files changed, 916 insertions(+), 123 deletions(-) diff --git a/README.md b/README.md index 100c9fa6..86b3701c 100644 --- a/README.md +++ b/README.md @@ -333,6 +333,7 @@ OpenKB settings are initialized by `openkb init` and stored in `.openkb/config.y model: gpt-5.4 # LLM model (any LiteLLM-supported provider) language: en # Wiki output language pageindex_threshold: 20 # PDF pages threshold for PageIndex +file_processing_jobs: 2 # Files to prepare concurrently during `openkb add ` ``` Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/providers) (OpenAI models can omit the prefix): diff --git a/config.yaml.example b/config.yaml.example index 45b4b8c3..7f9b5743 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -1,6 +1,7 @@ model: gpt-5.4 # LLM model (any LiteLLM-supported provider) language: en # Wiki output language pageindex_threshold: 20 # PDF pages threshold for PageIndex +file_processing_jobs: 2 # Number of files to prepare concurrently during `openkb add ` # Optional: extra HTTP headers sent with every LLM request (forwarded to # LiteLLM's extra_headers). Some providers need these — e.g. GitHub Copilot diff --git a/openkb/cli.py b/openkb/cli.py index b48ebc17..7d91ae5e 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -8,11 +8,16 @@ warnings.filterwarnings("ignore") import asyncio +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from dataclasses import dataclass import json import logging import shutil import sys import time +import unicodedata +import uuid from functools import wraps from pathlib import Path from typing import Literal @@ -45,9 +50,10 @@ def filter(self, record: logging.LogRecord) -> bool: DEFAULT_CONFIG, load_config, save_config, load_global_config, register_kb, resolve_extra_headers, set_extra_headers, ) -from openkb.converter import _registry_path, convert_document +from openkb.converter import _registry_path, _sanitize_stem, convert_document, resolve_doc_name from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log +from openkb.mutation import MutationSnapshot, publish_staged_tree, recover_pending_journals, snapshot_paths from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -67,6 +73,7 @@ def filter(self, record: logging.LogRecord) -> bool: # handled by LiteLLM itself) — no API key env var is needed, so the # missing-key warning would be a false alarm for them. _OAUTH_PROVIDERS = {"chatgpt", "github_copilot"} +_AddOutcome = Literal["added", "skipped", "failed"] def _extract_provider(model: str) -> str | None: @@ -274,13 +281,379 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None: shutil.rmtree(target) -def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: - """Convert, index, and compile a single document under the KB mutation lock.""" +@dataclass +class _PreparedAdd: + file_path: Path + result: object | None = None + staging_dir: Path | None = None + outcome: _AddOutcome | None = None + error_stage: str = "" + error: Exception | None = None + + +def _positive_int(value: object, default: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + return default + return max(1, parsed) + + +def _reserve_batch_doc_names(files: list[Path], kb_dir: Path) -> dict[Path, str]: + """Reserve doc_names in scan order so parallel prepare matches serial add.""" + from openkb.state import HashRegistry + + # In-memory registry view: same resolve contract as the on-disk registry + # but add() never persists. persist_legacy=True below therefore only + # *consumes* a matched legacy entry in memory (backfilling its path) so a + # later same-stem file in this batch no longer re-matches it via + # find_legacy_by_stem — the idempotency fix. With a persisting registry + # that same call would write disk on every reservation. + batch_registry = HashRegistry.memory( + HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() + ) + reserved: dict[Path, str] = {} + for file_path in files: + doc_name = resolve_doc_name( + file_path, + kb_dir, + batch_registry, + persist_legacy=True, + ) + reserved[file_path] = doc_name + # Future files in the same batch must see this name as already taken; + # otherwise same-stem files prepared in parallel could all claim it. + batch_registry.add( + f"batch:{len(reserved)}:{_registry_path(file_path, kb_dir)}", + { + "name": file_path.name, + "doc_name": doc_name, + "path": _registry_path(file_path, kb_dir), + }, + ) + return reserved + + +def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path: + safe = _sanitize_stem(file_path.stem) + # uuid (not time.time_ns()) for uniqueness: two same-stem files sanitize to + # the same `safe`, so the timestamp was the only differentiator and a + # repeated wall-clock ns would make mkdir(exist_ok=False) crash the batch. + path = kb_dir / ".openkb" / "staging" / f"add-{safe}-{uuid.uuid4().hex[:8]}" + path.mkdir(parents=True, exist_ok=False) + return path + + +def _cleanup_staging(path: Path | None) -> None: + if path is not None: + shutil.rmtree(path, ignore_errors=True) + + +def _prepare_add_file( + file_path: Path, + kb_dir: Path, + staging_dir: Path | None, + doc_name: str | None = None, +) -> _PreparedAdd: + """Run file-local conversion into an optional staging directory.""" + logger = logging.getLogger(__name__) + try: + result = convert_document( + file_path, + kb_dir, + assume_locked=True, + staging_dir=staging_dir, + doc_name_override=doc_name, + ) + except Exception as exc: + logger.debug("Conversion traceback:", exc_info=True) + return _PreparedAdd( + file_path=file_path, + staging_dir=staging_dir, + outcome="failed", + error_stage="Conversion", + error=exc, + ) + if result.skipped: + return _PreparedAdd( + file_path=file_path, + result=result, + staging_dir=staging_dir, + outcome="skipped", + ) + return _PreparedAdd(file_path=file_path, result=result, staging_dir=staging_dir) + + +def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: + final_raw = None + final_source = None + if result.raw_path is not None: + final_raw = kb_dir / "raw" / result.raw_path.name + if result.source_path is not None: + final_source = kb_dir / "wiki" / "sources" / result.source_path.name + return final_raw, final_source + + +def _snapshot_add_paths(kb_dir: Path, doc_name: str, final_raw: Path | None, final_source: Path | None) -> list[Path]: + paths = [ + kb_dir / ".openkb" / "hashes.json", + kb_dir / ".openkb" / "pageindex.db", + # SQLite may keep recently-written PageIndex state in sidecar files. + # Snapshot missing paths too, so rollback removes sidecars created + # during a failed long-document ingest. + kb_dir / ".openkb" / "pageindex.db-wal", + kb_dir / ".openkb" / "pageindex.db-shm", + kb_dir / ".openkb" / "pageindex.db-journal", + kb_dir / ".openkb" / "files", + kb_dir / "wiki" / "summaries" / f"{doc_name}.md", + kb_dir / "wiki" / "sources" / f"{doc_name}.json", + kb_dir / "wiki" / "sources" / "images" / doc_name, + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / "wiki" / "index.md", + kb_dir / "wiki" / "log.md", + ] + if final_raw is not None: + paths.append(final_raw) + if final_source is not None: + paths.append(final_source) + return paths + + +def _run_compile_with_retry(coro_factory, label: str) -> None: + """Run an async compile coroutine once, retrying once after 2s on failure. + + ``coro_factory`` builds a fresh coroutine each call (an ``asyncio.run``-consumed + coroutine can't be awaited twice). Raises on the second failure so the caller's + snapshot-rollback path runs. + """ + logger = logging.getLogger(__name__) + click.echo(f" {label}...") + for attempt in range(2): + try: + asyncio.run(coro_factory()) + return + except Exception as exc: + if attempt == 0: + click.echo(" Retrying compilation in 2s...") + time.sleep(2) + else: + click.echo(f" [ERROR] Compilation failed: {exc}") + logger.debug("Compilation traceback:", exc_info=True) + raise + + +def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _AddOutcome: + """Commit a prepared add while the caller holds the KB mutation lock.""" + from openkb.agent.compiler import compile_long_doc, compile_short_doc + from openkb.state import HashRegistry + + logger = logging.getLogger(__name__) + file_path = prepared.file_path + + if prepared.outcome == "failed": + click.echo(f" [ERROR] {prepared.error_stage} failed: {prepared.error}") + _cleanup_staging(prepared.staging_dir) + return "failed" + if prepared.outcome == "skipped": + click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") + _cleanup_staging(prepared.staging_dir) + return "skipped" + if prepared.result is None: + click.echo(f" [ERROR] Conversion failed: no result for {file_path.name}") + _cleanup_staging(prepared.staging_dir) + return "failed" + + result = prepared.result + registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") + if result.file_hash and registry.is_known(result.file_hash): + click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") + _cleanup_staging(prepared.staging_dir) + return "skipped" + + doc_name = result.doc_name or file_path.stem + norm_doc_name = unicodedata.normalize("NFKC", doc_name) + norm_file_name = unicodedata.normalize("NFKC", file_path.name) + path_key = _registry_path(file_path, kb_dir) + for existing_hash, meta in registry.all_entries().items(): + existing_name = meta.get("doc_name") or Path(meta.get("name", "")).stem + if ( + existing_hash != result.file_hash + and unicodedata.normalize("NFKC", existing_name) == norm_doc_name + ): + existing_path = meta.get("path") + # Same document iff it shares our path, OR it is a legacy + # (pre-path-index) entry with no path whose filename matches — the + # pre-collision form of this same document being re-ingested. A + # path-indexed entry must NOT be matched by filename alone: two + # different files can share a name, and a concurrent add could + # claim this doc_name in the window between reservation and commit. + # NFKC-normalize names: macOS reports filenames in NFD, so a raw + # `==` would mis-classify a same-document re-add as a conflict. + if existing_path == path_key or ( + not existing_path + and unicodedata.normalize("NFKC", meta.get("name") or "") == norm_file_name + ): + continue + click.echo( + " [ERROR] Document name conflict after parallel preparation: " + f"'{doc_name}' is already used by {meta.get('name', 'another document')}. " + "Re-run this file after the current batch." + ) + _cleanup_staging(prepared.staging_dir) + return "failed" + + final_raw, final_source = _final_artifact_paths(result, kb_dir) + snapshot: MutationSnapshot | None = None + index_result = None + + try: + # Take the snapshot inside the try: a failure here (disk-full copytree + # of concepts/entities, permission, relative_to ValueError) used to + # propagate uncaught — aborting the whole remaining batch and leaking + # an unjournaled backup dir. snapshot_paths now self-cleans its backup + # dir on partial failure, so when snapshot stays None there is nothing + # to roll back and the except branch handles that case. + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), + operation="add", + details={"file_hash": result.file_hash, "name": file_path.name, "doc_name": doc_name}, + ) + publish_staged_tree(prepared.staging_dir, kb_dir) + if final_raw is not None: + result.raw_path = final_raw + if final_source is not None: + result.source_path = final_source + + if result.is_long_doc: + click.echo(" Long document detected — indexing with PageIndex...") + try: + from openkb.indexer import index_long_document + + index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) + except Exception as exc: + click.echo(f" [ERROR] Indexing failed: {exc}") + logger.debug("Indexing traceback:", exc_info=True) + raise + + summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + index_result.doc_id, + kb_dir, + model, + doc_description=index_result.description, + ), + label=f"Compiling long doc (doc_id={index_result.doc_id})", + ) + else: + _run_compile_with_retry( + lambda: compile_short_doc(doc_name, result.source_path, kb_dir, model), + label="Compiling short doc", + ) + + if result.file_hash: + # Reuse the conflict-scan registry (line above): still valid under + # the same lock — nothing mutated it between scan and write. + doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") + meta = { + "name": file_path.name, + "doc_name": doc_name, + "type": doc_type, + "path": _registry_path(file_path, kb_dir), + } + if result.raw_path is not None: + meta["raw_path"] = _registry_path(result.raw_path, kb_dir) + if result.source_path is not None: + meta["source_path"] = _registry_path(result.source_path, kb_dir) + if index_result is not None: + meta["doc_id"] = index_result.doc_id + registry.remove_by_doc_name(doc_name) + for existing_hash, existing_meta in list(registry.all_entries().items()): + if ( + existing_hash != result.file_hash + and not existing_meta.get("doc_name") + and existing_meta.get("name") == file_path.name + ): + registry.remove_by_hash(existing_hash) + registry.add(result.file_hash, meta) + + # Commit point: the registry write is durable. Mark the journal + # committed so that any failure in the post-commit cleanup below + # (log append, backup removal) cannot trigger a recovery rollback + # that discards this completed ingest. mark_committed is the last + # step inside the try — together with registry.add it forms the + # atomic commit group, so a failure here correctly rolls back. + snapshot.mark_committed() + except Exception: + if snapshot is None: + # snapshot_paths failed before any mutation ran; it already removed + # its own backup dir, so there is nothing to roll back. + click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") + return "failed" + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + return "failed" + finally: + _cleanup_staging(prepared.staging_dir) + + # Post-commit side effects. These run only after the mutation is + # committed and the journal marked committed, so a failure here must NOT + # roll back — best-effort only. A stale "committed" journal left behind + # is harmless: the next recover_pending_journals sees status "committed" + # and discards it. + try: + append_log(kb_dir / "wiki", "ingest", file_path.name) + except Exception as exc: + logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc) + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo( + f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" + ) + click.echo(f" [OK] {file_path.name} added to knowledge base.") + return "added" + + +@contextmanager +def _kb_mutation_lock(kb_dir: Path): + """Acquire the ingest lock and drain pending mutation journals first. + + Draining recovery is part of taking the mutation lock: a process that + acquires it must restore the KB to a known state before mutating, so any + journal left by a prior interrupted run is rolled back (and a terminal + one discarded) before the caller runs. Recovery messages indicate a prior + interrupted run, so they surface at WARNING. + """ with kb_ingest_lock(kb_dir / ".openkb"): - return _add_single_file_locked(file_path, kb_dir) + for message in recover_pending_journals(kb_dir): + logging.getLogger(__name__).warning(message) + yield + +def add_single_file(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome: + """Convert, index, and compile a single document under the KB mutation lock. -def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: + ``stage=True`` converts into an isolated staging dir before the commit + snapshot, so a crash between convert and commit can't orphan raw/source + files in the live KB. Callers whose file already lives in ``raw/`` (watch + mode, URL fetch) pass ``stage=False`` to keep convert's in-place + optimization. + """ + with _kb_mutation_lock(kb_dir): + return _add_single_file_locked(file_path, kb_dir, stage=stage) + + +def _add_single_file_locked(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome: """Convert, index, and compile a single document into the knowledge base. Steps: @@ -297,106 +670,19 @@ def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", " be an orphan) while preserving it on failure so the user can retry without re-downloading. """ - from openkb.agent.compiler import compile_long_doc, compile_short_doc - from openkb.state import HashRegistry - - logger = logging.getLogger(__name__) openkb_dir = kb_dir / ".openkb" config = load_config(openkb_dir / "config.yaml") _setup_llm_key(kb_dir) model: str = config.get("model", DEFAULT_CONFIG["model"]) - # 2. Convert document click.echo(f"Adding: {file_path.name}") - try: - result = convert_document(file_path, kb_dir) - except Exception as exc: - click.echo(f" [ERROR] Conversion failed: {exc}") - logger.debug("Conversion traceback:", exc_info=True) - return "failed" - - if result.skipped: - click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") - return "skipped" - - doc_name = result.doc_name or file_path.stem - index_result = None # populated only on the long-doc branch - - # 3/4. Index and compile - if result.is_long_doc: - click.echo(f" Long document detected — indexing with PageIndex...") - try: - from openkb.indexer import index_long_document - index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) - except Exception as exc: - click.echo(f" [ERROR] Indexing failed: {exc}") - logger.debug("Indexing traceback:", exc_info=True) - return "failed" - - summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...") - for attempt in range(2): - try: - asyncio.run( - compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model, - doc_description=index_result.description) - ) - break - except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - else: - click.echo(f" Compiling short doc...") - for attempt in range(2): - try: - asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model)) - break - except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - - # Register hash only after successful compilation - if result.file_hash: - # Construct the registry NOW, not earlier: convert_document may have - # backfilled a legacy entry (doc_name/path) on disk via its own - # instance, and an earlier snapshot would clobber that backfill on - # the full rewrite in add(). - registry = HashRegistry(openkb_dir / "hashes.json") - doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") - meta = { - "name": file_path.name, - "doc_name": doc_name, - "type": doc_type, - "path": _registry_path(file_path, kb_dir), - } - if result.raw_path is not None: - meta["raw_path"] = _registry_path(result.raw_path, kb_dir) - if result.source_path is not None: - meta["source_path"] = _registry_path(result.source_path, kb_dir) - # For long PDFs we also persist the PageIndex doc_id so `openkb - # remove` can later call ``Collection.delete_document(doc_id)`` - # to free the managed PDF copy + SQLite row. - if index_result is not None: - meta["doc_id"] = index_result.doc_id - # An edited document arrives with a new content hash; drop the - # stale entry for the same doc_name so the registry keeps exactly - # one entry per document. - registry.remove_by_doc_name(doc_name) - registry.add(result.file_hash, meta) - - append_log(kb_dir / "wiki", "ingest", file_path.name) - click.echo(f" [OK] {file_path.name} added to knowledge base.") - return "added" + # Stage unless the file already lives in raw/ (watch/URL), where convert's + # in-place optimization applies. Staging keeps convert's writes out of the + # live KB so a crash between convert and the commit snapshot can't orphan + # raw/source files with no journal to recover them. + staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None + prepared = _prepare_add_file(file_path, kb_dir, staging_dir=staging_dir) + return _commit_prepared_add(prepared, kb_dir, model) # --------------------------------------------------------------------------- @@ -602,6 +888,7 @@ def init(model, language): "model": model, "language": language, "pageindex_threshold": DEFAULT_CONFIG["pageindex_threshold"], + "file_processing_jobs": DEFAULT_CONFIG["file_processing_jobs"], } save_config(openkb_dir / "config.yaml", config) atomic_write_json(openkb_dir / "hashes.json", {}) @@ -625,7 +912,6 @@ def init(model, language): @cli.command() @click.argument("path") @click.pass_context -@_with_kb_lock(exclusive=True) def add(ctx, path): """Add a document or directory of documents at PATH to the knowledge base. @@ -647,10 +933,11 @@ def add(ctx, path): # that the registry can't reach via openkb remove. from openkb.url_ingest import looks_like_url, fetch_url_to_raw if looks_like_url(path): - fetched = fetch_url_to_raw(path, kb_dir) - if fetched is None: - return - outcome = add_single_file(fetched, kb_dir) + with _kb_mutation_lock(kb_dir): + fetched = fetch_url_to_raw(path, kb_dir) + if fetched is None: + return + outcome = _add_single_file_locked(fetched, kb_dir, stage=False) # Only clean up on dedup-skip. On "failed" we keep the file so # the user can retry (e.g. transient LLM error during compile) # without re-downloading — and so they don't lose data when @@ -674,9 +961,45 @@ def add(ctx, path): return total = len(files) click.echo(f"Found {total} supported file(s) in {path}.") - for i, f in enumerate(files, 1): - click.echo(f"\n[{i}/{total}] ", nl=False) - add_single_file(f, kb_dir) + config = load_config(kb_dir / ".openkb" / "config.yaml") + jobs = min(total, _positive_int(config.get("file_processing_jobs"), DEFAULT_CONFIG["file_processing_jobs"])) + _setup_llm_key(kb_dir) + model = config.get("model", DEFAULT_CONFIG["model"]) + with _kb_mutation_lock(kb_dir): + reserved_doc_names = _reserve_batch_doc_names(files, kb_dir) + if jobs == 1: + for i, f in enumerate(files, 1): + click.echo(f"\n[{i}/{total}] ", nl=False) + # Stage exactly like the multi-worker path: prepare must never + # write the live KB unlocked. staging_dir=None would make + # convert_document write raw/ and wiki/sources/ straight into + # kb_dir with no lock and no mutation journal (orphan on crash). + prepared = _prepare_add_file( + f, kb_dir, staging_dir=_staging_dir_for(kb_dir, f), doc_name=reserved_doc_names[f] + ) + with _kb_mutation_lock(kb_dir): + _commit_prepared_add(prepared, kb_dir, model) + return + + click.echo(f"Preparing files with {jobs} worker(s).") + with ThreadPoolExecutor(max_workers=jobs) as executor: + futures = { + f: executor.submit( + _prepare_add_file, + f, + kb_dir, + _staging_dir_for(kb_dir, f), + reserved_doc_names[f], + ) + for f in files + } + # Commit in scan order even though prepare finishes out of order. + # That keeps log.md and CLI progress stable for human audit. + for i, f in enumerate(files, 1): + prepared = futures[f].result() + click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}") + with _kb_mutation_lock(kb_dir): + _commit_prepared_add(prepared, kb_dir, model) else: if target.suffix.lower() not in SUPPORTED_EXTENSIONS: click.echo( @@ -1412,7 +1735,7 @@ def on_new_files(paths): f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}" ) continue - add_single_file(fp, kb_dir) + add_single_file(fp, kb_dir, stage=False) click.echo(f"Watching {raw_dir} for new documents. Press Ctrl+C to stop.") watch_directory(raw_dir, on_new_files) diff --git a/openkb/config.py b/openkb/config.py index 9d0d6cd4..4c44addd 100644 --- a/openkb/config.py +++ b/openkb/config.py @@ -16,6 +16,7 @@ "model": "gpt-5.4-mini", "language": "en", "pageindex_threshold": 20, + "file_processing_jobs": 2, } # Default entity-type vocabulary. Overridable per-KB via the optional diff --git a/openkb/converter.py b/openkb/converter.py index 9ebac762..b5c2add6 100644 --- a/openkb/converter.py +++ b/openkb/converter.py @@ -14,6 +14,7 @@ from openkb.config import load_config from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images +from openkb.locks import atomic_write_text, kb_ingest_lock from openkb.state import HashRegistry logger = logging.getLogger(__name__) @@ -29,6 +30,7 @@ class ConvertResult: skipped: bool = False file_hash: str | None = None # For deferred hash registration doc_name: str | None = None # Stable wiki name (collision-resistant) + staging_dir: Path | None = None def _registry_path(path: Path, kb_dir: Path) -> str: @@ -70,7 +72,13 @@ def _name_taken(candidate: str, registry: HashRegistry) -> bool: return False -def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: +def resolve_doc_name( + src: Path, + kb_dir: Path, + registry: HashRegistry, + *, + persist_legacy: bool = True, +) -> str: """Resolve the stable wiki name for ``src`` (Scheme A). Identity is keyed by path: a source we've seen before (same path, even @@ -93,9 +101,10 @@ def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: file_hash, meta = legacy meta = dict(meta) name = meta.get("doc_name") or Path(meta.get("name", "")).stem - meta["doc_name"] = name - meta["path"] = path_key - registry.add(file_hash, meta) # backfill + persist + if persist_legacy: + meta["doc_name"] = name + meta["path"] = path_key + registry.add(file_hash, meta) # backfill + persist return name candidate = _sanitize_stem(src.stem) @@ -111,7 +120,14 @@ def get_pdf_page_count(path: Path) -> int: return doc.page_count -def convert_document(src: Path, kb_dir: Path) -> ConvertResult: +def convert_document( + src: Path, + kb_dir: Path, + *, + assume_locked: bool = False, + staging_dir: Path | None = None, + doc_name_override: str | None = None, +) -> ConvertResult: """Convert a document and integrate it into the knowledge base. Steps: @@ -122,12 +138,23 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: 5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``. 6. Register hash in the registry. """ + if not assume_locked: + with kb_ingest_lock(kb_dir / ".openkb"): + return convert_document( + src, + kb_dir, + assume_locked=True, + staging_dir=staging_dir, + doc_name_override=doc_name_override, + ) + # ------------------------------------------------------------------ # Load config & state # ------------------------------------------------------------------ openkb_dir = kb_dir / ".openkb" config = load_config(openkb_dir / "config.yaml") threshold: int = config.get("pageindex_threshold", 20) + artifact_root = staging_dir if staging_dir is not None else kb_dir registry = HashRegistry(openkb_dir / "hashes.json") # ------------------------------------------------------------------ @@ -141,15 +168,23 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: skipped=True, file_hash=file_hash, doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, + staging_dir=staging_dir, ) - doc_name = resolve_doc_name(src, kb_dir, registry) + # Batch ingest reserves doc_names before parallel conversion so same-stem + # files behave like serial adds while still writing to isolated staging dirs. + doc_name = doc_name_override or resolve_doc_name( + src, + kb_dir, + registry, + persist_legacy=staging_dir is None, + ) # ------------------------------------------------------------------ # 2. Copy to raw/ # ------------------------------------------------------------------ - raw_dir = kb_dir / "raw" + raw_dir = artifact_root / "raw" raw_dir.mkdir(parents=True, exist_ok=True) - if src.resolve().is_relative_to(raw_dir.resolve()): + if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()): # Watch mode: the file already lives in raw/ — don't copy/rename. raw_dest = src else: @@ -173,14 +208,15 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: is_long_doc=True, file_hash=file_hash, doc_name=doc_name, + staging_dir=staging_dir, ) # ------------------------------------------------------------------ # 4/5. Convert to Markdown # ------------------------------------------------------------------ - sources_dir = kb_dir / "wiki" / "sources" + sources_dir = artifact_root / "wiki" / "sources" sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = kb_dir / "wiki" / "sources" / "images" / doc_name + images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name images_dir.mkdir(parents=True, exist_ok=True) if src.suffix.lower() == ".md": @@ -197,11 +233,12 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: markdown = extract_base64_images(markdown, doc_name, images_dir) dest_md = sources_dir / f"{doc_name}.md" - dest_md.write_text(markdown, encoding="utf-8") + atomic_write_text(dest_md, markdown) return ConvertResult( raw_path=raw_dest, source_path=dest_md, file_hash=file_hash, doc_name=doc_name, + staging_dir=staging_dir, ) diff --git a/openkb/state.py b/openkb/state.py index 10cb20cc..c7302770 100644 --- a/openkb/state.py +++ b/openkb/state.py @@ -13,12 +13,29 @@ class HashRegistry: def __init__(self, path: Path) -> None: self._path = path + self._persist_enabled = True if path.exists(): with path.open("r", encoding="utf-8") as fh: self._data: dict[str, dict] = json.load(fh) else: self._data = {} + @classmethod + def memory(cls, entries: dict[str, dict]) -> "HashRegistry": + """An in-memory view over ``entries`` that never writes back to disk. + + Shares the on-disk registry's read/resolve contract (``get_by_path``, + ``find_legacy_by_stem``, ``all_entries``, ``add``) so callers that only + need to stage mutations in memory — batch doc_name reservation — + reuse the same code path instead of a parallel reimplementation. + ``add`` updates the in-memory dict but skips persistence. + """ + reg = cls.__new__(cls) + reg._path = None + reg._persist_enabled = False + reg._data = {key: dict(value) for key, value in entries.items()} + return reg + # ------------------------------------------------------------------ # Query helpers # ------------------------------------------------------------------ @@ -115,6 +132,8 @@ def remove_by_hash(self, file_hash: str) -> bool: # ------------------------------------------------------------------ def _persist(self) -> None: + if not self._persist_enabled: + return atomic_write_json(self._path, self._data) # ------------------------------------------------------------------ diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 0199c9e2..4243c7e0 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -80,13 +80,20 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path): (docs_dir / "b.txt").write_text("B content") (docs_dir / "ignore.xyz").write_text("skip me") + from openkb.cli import _PreparedAdd + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir) + runner = CliRunner() - with patch("openkb.cli.add_single_file") as mock_add, \ + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \ + patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \ patch("openkb.cli._find_kb_dir", return_value=kb_dir): runner.invoke(cli, ["add", str(docs_dir)]) - # Should be called for .md and .txt but not .xyz - assert mock_add.call_count == 2 - called_names = {call.args[0].name for call in mock_add.call_args_list} + # Should be prepared/committed for .md and .txt but not .xyz + assert mock_prepare.call_count == 2 + assert mock_commit.call_count == 2 + called_names = {call.args[0].name for call in mock_prepare.call_args_list} assert "a.md" in called_names assert "b.txt" in called_names assert "ignore.xyz" not in called_names @@ -168,6 +175,45 @@ def test_add_short_doc_runs_compiler(self, tmp_path): assert "path" in meta assert "stale-old-hash" not in hashes + def test_commit_keeps_journal_when_rollback_fails(self, tmp_path): + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + source_path = kb_dir / "wiki" / "sources" / "broken.md" + source_path.write_text("# Broken", encoding="utf-8") + prepared = _PreparedAdd( + file_path=tmp_path / "broken.md", + result=ConvertResult( + raw_path=kb_dir / "raw" / "broken.md", + source_path=source_path, + file_hash="beadfeed00" * 8, + doc_name="broken", + ), + ) + + class FakeSnapshot: + journal_path = kb_dir / ".openkb" / "journal" / "broken.json" + + def __init__(self): + self.discard_called = False + + def rollback_best_effort(self): + return RuntimeError("rollback failed") + + def discard_best_effort(self): + self.discard_called = True + + fake_snapshot = FakeSnapshot() + + with patch("openkb.cli.snapshot_paths", return_value=fake_snapshot), \ + patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "failed" + assert fake_snapshot.discard_called is False + def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path): """Editing a pre-doc_name-era document must not fork the registry. @@ -203,3 +249,368 @@ def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path): new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"] assert len(new_entries) == 1 # …exactly one entry survives assert new_entries[0]["path"] # with path identity persisted + + def test_add_directory_legacy_entry_converges_to_single_entry(self, tmp_path): + import json as json_mod + + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "old-hash", {"name": "notes.md", "type": "md"} + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "notes.md").write_text("# Notes, edited", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + assert "OK" in result.output + + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + assert "old-hash" not in hashes + new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"] + assert len(new_entries) == 1 + assert new_entries[0]["path"] + + def test_add_directory_same_stem_files_get_reserved_names(self, tmp_path): + import json as json_mod + + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + (docs_dir / "a").mkdir(parents=True) + (docs_dir / "b").mkdir(parents=True) + (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8") + (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + assert "Document name conflict" not in result.output + assert result.output.count("[OK]") == 2 + + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + doc_names = {meta["doc_name"] for meta in hashes.values()} + assert len(doc_names) == 2 + assert "report" in doc_names + assert any(name.startswith("report-") for name in doc_names) + + def test_add_directory_same_stem_with_legacy_entry_no_duplicate(self, tmp_path): + """Two same-stem files plus a legacy (path-less) entry must not both + reserve the legacy doc_name. ``find_legacy_by_stem`` must be consumed + (idempotent) across the batch so the second file gets a suffixed name + instead of colliding with the first. + """ + import json as json_mod + + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + # Legacy entry: name + doc_name but NO path → find_legacy_by_stem matches "report". + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "legacy-hash", {"name": "report.md", "doc_name": "report", "type": "md"} + ) + docs_dir = tmp_path / "docs" + (docs_dir / "a").mkdir(parents=True) + (docs_dir / "b").mkdir(parents=True) + (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8") + (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + assert "Document name conflict" not in result.output + assert result.output.count("[OK]") == 2 + + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + report_names = [ + m["doc_name"] for m in hashes.values() if str(m.get("doc_name", "")).startswith("report") + ] + assert len(report_names) == 2 + assert len(set(report_names)) == 2 # no silent overwrite + + def test_commit_rejects_same_filename_different_path_conflict(self, tmp_path): + """A path-indexed entry sharing doc_name + filename but with a + different path (a concurrent add of a same-named file in the + reservation/commit window) must be rejected, not silently + overwritten via the filename escape. + """ + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + source_path = kb_dir / "wiki" / "sources" / "report.md" + source_path.parent.mkdir(parents=True, exist_ok=True) + source_path.write_text("# mine", encoding="utf-8") + + # A DIFFERENT document already owns doc_name "report": different path, + # different hash, same filename. + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "other-hash", + {"name": "report.md", "doc_name": "report", "type": "md", + "path": "elsewhere/report.md"}, + ) + + prepared = _PreparedAdd( + file_path=tmp_path / "report.md", + result=ConvertResult( + raw_path=kb_dir / "raw" / "report.md", + source_path=source_path, + file_hash="myhash" + "0" * 59, + doc_name="report", + ), + ) + + with patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "failed" + hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")) + # The pre-existing document is untouched. + assert "other-hash" in hashes + assert hashes["other-hash"]["path"] == "elsewhere/report.md" + assert "myhash" + "0" * 59 not in hashes + + def test_add_directory_jobs1_stages_each_file(self, tmp_path): + """jobs==1 must stage each file (pass a real staging_dir) instead of + writing the live KB unlocked via staging_dir=None. + """ + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 1\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# A", encoding="utf-8") + (docs_dir / "b.md").write_text("# B", encoding="utf-8") + + from openkb.cli import _PreparedAdd + + seen_staging: list = [] + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + seen_staging.append(staging_dir) + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir) + + runner = CliRunner() + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \ + patch("openkb.cli._commit_prepared_add", return_value="added"), \ + patch("openkb.cli._find_kb_dir", return_value=kb_dir): + runner.invoke(cli, ["add", str(docs_dir)]) + + assert len(seen_staging) == 2 + assert all(s is not None for s in seen_staging) # regression: was None + + def test_commit_returns_added_when_post_commit_cleanup_fails(self, tmp_path): + """Once the registry write lands, a failure in journal cleanup must + NOT roll back the completed ingest (regression: the discard/log used + to live inside the rollback try). + """ + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + source_path = kb_dir / "wiki" / "sources" / "ok.md" + source_path.parent.mkdir(parents=True, exist_ok=True) + source_path.write_text("# OK", encoding="utf-8") + prepared = _PreparedAdd( + file_path=tmp_path / "ok.md", + result=ConvertResult( + raw_path=kb_dir / "raw" / "ok.md", + source_path=source_path, + file_hash="ok" + "0" * 62, + doc_name="ok", + ), + ) + + class FakeSnapshot: + journal_path = kb_dir / ".openkb" / "journal" / "ok.json" + + def mark_committed(self): + pass + + def rollback_best_effort(self): + return None + + def discard_best_effort(self): + return RuntimeError("cleanup failed") + + with patch("openkb.cli.snapshot_paths", return_value=FakeSnapshot()), \ + patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "added" # regression: was "failed" (rolled back success) + hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")) + assert "ok" + "0" * 62 in hashes # registry write survived + + def test_commit_rolls_back_real_snapshot_on_compile_failure(self, tmp_path): + """End-to-end rollback: a REAL snapshot + REAL publish, then a compile + failure, must restore the KB to its pre-add state — published raw and + source files removed, registry unchanged, no orphaned artifacts or + journal. The FakeSnapshot-based test cannot exercise this transactional + guarantee (the whole reason the feature exists). + """ + import json as json_mod + + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + pre_hashes = (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + + # A staging dir holding the converted artifacts that publish_staged_tree + # copies into the live KB before compile runs. + staging = tmp_path / "staging" + (staging / "raw").mkdir(parents=True) + (staging / "wiki" / "sources").mkdir(parents=True) + (staging / "raw" / "boom.md").write_text("# raw", encoding="utf-8") + source_md = staging / "wiki" / "sources" / "boom.md" + source_md.write_text("# converted", encoding="utf-8") + + prepared = _PreparedAdd( + file_path=tmp_path / "boom.md", + result=ConvertResult( + raw_path=staging / "raw" / "boom.md", + source_path=source_md, + file_hash="boom" + "0" * 60, + doc_name="boom", + ), + staging_dir=staging, + ) + + with patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")), \ + patch("openkb.cli.time.sleep"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "failed" + # Published artifacts were rolled back (removed). + assert not (kb_dir / "raw" / "boom.md").exists() + assert not (kb_dir / "wiki" / "sources" / "boom.md").exists() + assert not (kb_dir / "wiki" / "summaries" / "boom.md").exists() + # Registry restored to pre-add state; no leaked boom entry. + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + assert hashes == json.loads(pre_hashes) + assert "boom" + "0" * 60 not in hashes + # No orphan journal/backup left behind; staging cleaned up. + assert not any((kb_dir / ".openkb" / "journal").glob("*.json")) + assert not staging.exists() + + def test_add_snapshot_rolls_back_pageindex_sqlite_sidecars(self, tmp_path): + """Long-doc failures must not leave SQLite sidecars newer than pageindex.db.""" + from openkb.cli import _snapshot_add_paths + from openkb.mutation import snapshot_paths + + kb_dir = self._setup_kb(tmp_path) + openkb_dir = kb_dir / ".openkb" + (openkb_dir / "pageindex.db").write_bytes(b"before") + + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, "long", None, None), + operation="add", + details={}, + ) + + for suffix in ("-wal", "-shm", "-journal"): + (openkb_dir / f"pageindex.db{suffix}").write_bytes(b"after") + + snapshot.rollback() + snapshot.discard() + + assert (openkb_dir / "pageindex.db").read_bytes() == b"before" + for suffix in ("-wal", "-shm", "-journal"): + assert not (openkb_dir / f"pageindex.db{suffix}").exists() + + def test_add_single_file_stages_unless_file_already_in_raw(self, tmp_path): + """stage=True (default for single-file add / chat) routes convert + through an isolated staging dir; stage=False (watch / URL, file + already in raw/) keeps convert's in-place path. The staging default + closes the crash-orphan window for files that don't already live in + raw/.""" + from openkb.cli import _PreparedAdd, _add_single_file_locked + + kb_dir = self._setup_kb(tmp_path) + doc = tmp_path / "test.md" + doc.write_text("# hi", encoding="utf-8") + + captured: list = [] + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + captured.append(staging_dir) + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir, outcome="skipped") + + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \ + patch("openkb.cli._commit_prepared_add", return_value="skipped"): + _add_single_file_locked(doc, kb_dir) # default stage=True + _add_single_file_locked(doc, kb_dir, stage=False) + + assert captured[0] is not None # staged by default → no live-KB write pre-snapshot + assert captured[1] is None # in-place for watch/URL (file already in raw/) + + def test_commit_conflict_guard_normalizes_unicode_filenames(self, tmp_path): + """A legacy (path-less) entry whose name is stored NFC must match a + file whose name the filesystem reports as NFD (macOS HFS+/APFS), so a + same-document re-add is allowed instead of mis-reported as a conflict. + The guard NFKC-normalizes both sides; a raw ``==`` would diverge.""" + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + import unicodedata as _ud + nfc_name = "r\u00e9sum\u00e9.pdf" # NFC: é = U+00E9 (composed) + nfd_name = _ud.normalize("NFD", nfc_name) # NFD: e + U+0301 (decomposed) + assert nfc_name != nfd_name # raw bytes differ + + source_path = kb_dir / "wiki" / "sources" / "resume.md" + source_path.parent.mkdir(parents=True, exist_ok=True) + source_path.write_text("# cv", encoding="utf-8") + + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "legacy-hash", {"name": nfc_name, "doc_name": "resume", "type": "pdf"} + ) + + prepared = _PreparedAdd( + file_path=tmp_path / nfd_name, + result=ConvertResult( + raw_path=kb_dir / "raw" / "resume.pdf", + source_path=source_path, + file_hash="new" + "0" * 61, + doc_name="resume", + ), + ) + + with patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + # NFC-vs-NFD is the same document, not a conflict → ingest proceeds. + assert outcome == "added" From 5ee4c33989a2bb77bbe71527dd984ee777c4e6d7 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Fri, 19 Jun 2026 18:23:24 +0800 Subject: [PATCH 3/7] docs: clarify concurrent add configuration Document file_processing_jobs: add it to the Settings yaml example and note (README Advanced options + config.yaml.example) that only file preparation is parallelized while live-KB mutation stays serialized, so raising it helps mainly when conversion is the bottleneck. --- README.md | 2 ++ config.yaml.example | 3 +++ 2 files changed, 5 insertions(+) diff --git a/README.md b/README.md index 86b3701c..d797cd33 100644 --- a/README.md +++ b/README.md @@ -348,6 +348,8 @@ Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/p Advanced options (entity_types, extra_headers, OAuth):
+`file_processing_jobs` (default `2`): number of files prepared concurrently during `openkb add `. Only the preparation stage is parallelized (hashing, duplicate prefiltering, raw/source staging, conversion); live-KB mutation stays serialized under the mutation lock, so raising it helps mainly when conversion is the bottleneck. + `entity_types` (optional): a YAML list overriding the entity-type vocabulary used for entity pages; omit it to use the default `person`, `organization`, `place`, `product`, `work`, `event`, `other`. `extra_headers` (optional): a YAML mapping of extra HTTP headers sent with every LLM request (forwarded to LiteLLM's `extra_headers`). Useful for providers that expect custom headers, e.g. GitHub Copilot IDE-auth headers: diff --git a/config.yaml.example b/config.yaml.example index 7f9b5743..aff75f0a 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -2,6 +2,9 @@ model: gpt-5.4 # LLM model (any LiteLLM-supported provider) language: en # Wiki output language pageindex_threshold: 20 # PDF pages threshold for PageIndex file_processing_jobs: 2 # Number of files to prepare concurrently during `openkb add ` +# Note: this parallelizes hashing/conversion/staging only. Live KB publish, +# PageIndex indexing, LLM compilation, registry updates, and log writes remain +# serialized under the KB mutation lock. # Optional: extra HTTP headers sent with every LLM request (forwarded to # LiteLLM's extra_headers). Some providers need these — e.g. GitHub Copilot From 8714c5814778a9147c94043f3474be360f3e6a55 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Fri, 19 Jun 2026 18:24:27 +0800 Subject: [PATCH 4/7] feat: add add-pipeline timing diagnostics Add DEBUG-level timing logs across the add pipeline (lock_wait, prefilter, prepare, index, compile, commit) via _log_add_timing, gated behind isEnabledFor(DEBUG) so there is no cost when disabled. Surfaces where time goes during concurrent directory add. --- openkb/cli.py | 52 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index 7d91ae5e..f9e38fd4 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -299,6 +299,21 @@ def _positive_int(value: object, default: int) -> int: return max(1, parsed) +def _log_add_timing( + stage: str, + file_path: Path | None, + started_at: float, + **fields: object, +) -> None: + logger = logging.getLogger(__name__) + if not logger.isEnabledFor(logging.DEBUG): + return + elapsed = time.perf_counter() - started_at + subject = file_path.name if file_path is not None else "" + suffix = "".join(f" {key}={value}" for key, value in fields.items()) + logger.debug("add %s for %s took %.3fs%s", stage, subject, elapsed, suffix) + + def _reserve_batch_doc_names(files: list[Path], kb_dir: Path) -> dict[Path, str]: """Reserve doc_names in scan order so parallel prepare matches serial add.""" from openkb.state import HashRegistry @@ -357,6 +372,7 @@ def _prepare_add_file( ) -> _PreparedAdd: """Run file-local conversion into an optional staging directory.""" logger = logging.getLogger(__name__) + started = time.perf_counter() try: result = convert_document( file_path, @@ -367,6 +383,7 @@ def _prepare_add_file( ) except Exception as exc: logger.debug("Conversion traceback:", exc_info=True) + _log_add_timing("prepare", file_path, started, outcome="failed") return _PreparedAdd( file_path=file_path, staging_dir=staging_dir, @@ -375,13 +392,17 @@ def _prepare_add_file( error=exc, ) if result.skipped: - return _PreparedAdd( + prepared = _PreparedAdd( file_path=file_path, result=result, staging_dir=staging_dir, outcome="skipped", ) - return _PreparedAdd(file_path=file_path, result=result, staging_dir=staging_dir) + _log_add_timing("prepare", file_path, started, outcome="skipped") + return prepared + prepared = _PreparedAdd(file_path=file_path, result=result, staging_dir=staging_dir) + _log_add_timing("prepare", file_path, started, outcome="prepared") + return prepared def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: @@ -429,9 +450,11 @@ def _run_compile_with_retry(coro_factory, label: str) -> None: """ logger = logging.getLogger(__name__) click.echo(f" {label}...") + started = time.perf_counter() for attempt in range(2): try: asyncio.run(coro_factory()) + _log_add_timing("compile", None, started, label=label) return except Exception as exc: if attempt == 0: @@ -450,26 +473,31 @@ def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _A logger = logging.getLogger(__name__) file_path = prepared.file_path + started = time.perf_counter() + + def finish(outcome: _AddOutcome) -> _AddOutcome: + _log_add_timing("commit", file_path, started, outcome=outcome) + return outcome if prepared.outcome == "failed": click.echo(f" [ERROR] {prepared.error_stage} failed: {prepared.error}") _cleanup_staging(prepared.staging_dir) - return "failed" + return finish("failed") if prepared.outcome == "skipped": click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") _cleanup_staging(prepared.staging_dir) - return "skipped" + return finish("skipped") if prepared.result is None: click.echo(f" [ERROR] Conversion failed: no result for {file_path.name}") _cleanup_staging(prepared.staging_dir) - return "failed" + return finish("failed") result = prepared.result registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") if result.file_hash and registry.is_known(result.file_hash): click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") _cleanup_staging(prepared.staging_dir) - return "skipped" + return finish("skipped") doc_name = result.doc_name or file_path.stem norm_doc_name = unicodedata.normalize("NFKC", doc_name) @@ -501,7 +529,7 @@ def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _A "Re-run this file after the current batch." ) _cleanup_staging(prepared.staging_dir) - return "failed" + return finish("failed") final_raw, final_source = _final_artifact_paths(result, kb_dir) snapshot: MutationSnapshot | None = None @@ -528,10 +556,12 @@ def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _A if result.is_long_doc: click.echo(" Long document detected — indexing with PageIndex...") + index_started = time.perf_counter() try: from openkb.indexer import index_long_document index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) + _log_add_timing("index", file_path, index_started, doc_name=doc_name) except Exception as exc: click.echo(f" [ERROR] Indexing failed: {exc}") logger.debug("Indexing traceback:", exc_info=True) @@ -593,7 +623,7 @@ def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _A # snapshot_paths failed before any mutation ran; it already removed # its own backup dir, so there is nothing to roll back. click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") - return "failed" + return finish("failed") rollback_error = snapshot.rollback_best_effort() if rollback_error is None: snapshot.discard_best_effort() @@ -602,7 +632,7 @@ def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _A " [ERROR] Rollback failed; mutation journal retained for recovery: " f"{snapshot.journal_path}" ) - return "failed" + return finish("failed") finally: _cleanup_staging(prepared.staging_dir) @@ -621,7 +651,7 @@ def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _A f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" ) click.echo(f" [OK] {file_path.name} added to knowledge base.") - return "added" + return finish("added") @contextmanager @@ -634,7 +664,9 @@ def _kb_mutation_lock(kb_dir: Path): one discarded) before the caller runs. Recovery messages indicate a prior interrupted run, so they surface at WARNING. """ + started = time.perf_counter() with kb_ingest_lock(kb_dir / ".openkb"): + _log_add_timing("lock_wait", None, started) for message in recover_pending_journals(kb_dir): logging.getLogger(__name__).warning(message) yield From 088f25229efe5e4f0b42e1b4503fcd6f66f0d27f Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Fri, 19 Jun 2026 18:25:45 +0800 Subject: [PATCH 5/7] perf: prefilter known files before parallel preparation Hash directory inputs up front and skip files whose hash is already in the registry before spawning prepare workers, so known duplicates no longer go through conversion and staging. Hashing runs across the jobs workers; files that fail to hash surface as per-file "failed" outcomes instead of aborting the batch. --- openkb/cli.py | 98 ++++++++++++++++++++++++++++++++++----- tests/test_add_command.py | 43 +++++++++++++++++ 2 files changed, 129 insertions(+), 12 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index f9e38fd4..a0abe7b5 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -314,6 +314,64 @@ def _log_add_timing( logger.debug("add %s for %s took %.3fs%s", stage, subject, elapsed, suffix) +def _prefilter_known_files( + files: list[Path], + kb_dir: Path, + jobs: int, +) -> tuple[list[Path], dict[Path, _PreparedAdd]]: + """Hash directory inputs before conversion and skip hashes already known.""" + from openkb.state import HashRegistry + + started = time.perf_counter() + with _kb_mutation_lock(kb_dir): + registry = HashRegistry.memory( + HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() + ) + + def hash_one(file_path: Path) -> tuple[Path, str | None, Exception | None]: + try: + return file_path, HashRegistry.hash_file(file_path), None + except Exception as exc: + return file_path, None, exc + + if jobs == 1: + hashed = [hash_one(file_path) for file_path in files] + else: + with ThreadPoolExecutor(max_workers=jobs) as executor: + futures = [executor.submit(hash_one, file_path) for file_path in files] + hashed = [future.result() for future in futures] + + remaining: list[Path] = [] + prepared: dict[Path, _PreparedAdd] = {} + for file_path, file_hash, error in hashed: + if error is not None: + prepared[file_path] = _PreparedAdd( + file_path=file_path, + outcome="failed", + error_stage="Hash", + error=error, + ) + continue + if file_hash is not None and registry.is_known(file_hash): + prepared[file_path] = _PreparedAdd( + file_path=file_path, + outcome="skipped", + ) + continue + remaining.append(file_path) + + _log_add_timing( + "prefilter", + None, + started, + total=len(files), + remaining=len(remaining), + skipped=len(prepared), + jobs=jobs, + ) + return remaining, prepared + + def _reserve_batch_doc_names(files: list[Path], kb_dir: Path) -> dict[Path, str]: """Reserve doc_names in scan order so parallel prepare matches serial add.""" from openkb.state import HashRegistry @@ -994,21 +1052,35 @@ def add(ctx, path): total = len(files) click.echo(f"Found {total} supported file(s) in {path}.") config = load_config(kb_dir / ".openkb" / "config.yaml") - jobs = min(total, _positive_int(config.get("file_processing_jobs"), DEFAULT_CONFIG["file_processing_jobs"])) + jobs = min( + total, + _positive_int( + config.get("file_processing_jobs"), + DEFAULT_CONFIG["file_processing_jobs"], + ), + ) _setup_llm_key(kb_dir) model = config.get("model", DEFAULT_CONFIG["model"]) + files_to_prepare, prepared_outcomes = _prefilter_known_files(files, kb_dir, jobs) with _kb_mutation_lock(kb_dir): - reserved_doc_names = _reserve_batch_doc_names(files, kb_dir) + reserved_doc_names = _reserve_batch_doc_names(files_to_prepare, kb_dir) if jobs == 1: for i, f in enumerate(files, 1): - click.echo(f"\n[{i}/{total}] ", nl=False) - # Stage exactly like the multi-worker path: prepare must never - # write the live KB unlocked. staging_dir=None would make - # convert_document write raw/ and wiki/sources/ straight into - # kb_dir with no lock and no mutation journal (orphan on crash). - prepared = _prepare_add_file( - f, kb_dir, staging_dir=_staging_dir_for(kb_dir, f), doc_name=reserved_doc_names[f] - ) + if f in prepared_outcomes: + prepared = prepared_outcomes[f] + click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}") + else: + click.echo(f"\n[{i}/{total}] ", nl=False) + # Stage exactly like the multi-worker path: prepare must never + # write the live KB unlocked. staging_dir=None would make + # convert_document write raw/ and wiki/sources/ straight into + # kb_dir with no lock and no mutation journal (orphan on crash). + prepared = _prepare_add_file( + f, + kb_dir, + staging_dir=_staging_dir_for(kb_dir, f), + doc_name=reserved_doc_names[f], + ) with _kb_mutation_lock(kb_dir): _commit_prepared_add(prepared, kb_dir, model) return @@ -1023,12 +1095,14 @@ def add(ctx, path): _staging_dir_for(kb_dir, f), reserved_doc_names[f], ) - for f in files + for f in files_to_prepare } # Commit in scan order even though prepare finishes out of order. # That keeps log.md and CLI progress stable for human audit. for i, f in enumerate(files, 1): - prepared = futures[f].result() + prepared = prepared_outcomes.get(f) + if prepared is None: + prepared = futures[f].result() click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}") with _kb_mutation_lock(kb_dir): _commit_prepared_add(prepared, kb_dir, model) diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 4243c7e0..168fba73 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -98,6 +98,49 @@ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): assert "b.txt" in called_names assert "ignore.xyz" not in called_names + def test_add_directory_prefilters_known_hashes_before_prepare(self, tmp_path): + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + known = docs_dir / "known.md" + unknown = docs_dir / "unknown.md" + known.write_text("# Known", encoding="utf-8") + unknown.write_text("# Unknown", encoding="utf-8") + + from openkb.cli import _PreparedAdd + from openkb.state import HashRegistry + + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + HashRegistry.hash_file(known), + { + "name": "known.md", + "doc_name": "known", + "type": "md", + "path": "docs/known.md", + }, + ) + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir) + + runner = CliRunner() + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \ + patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \ + patch("openkb.cli._find_kb_dir", return_value=kb_dir): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exception is None + assert [call.args[0].name for call in mock_prepare.call_args_list] == ["unknown.md"] + assert [call.args[0].file_path.name for call in mock_commit.call_args_list] == [ + "known.md", + "unknown.md", + ] + assert mock_commit.call_args_list[0].args[0].outcome == "skipped" + def test_add_unsupported_extension(self, tmp_path): kb_dir = self._setup_kb(tmp_path) doc = tmp_path / "file.xyz" From a05994227af9d6eafa85b3e567ee556b51a85328 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Sat, 20 Jun 2026 00:41:56 +0800 Subject: [PATCH 6/7] fix: drain pending mutation journals on every exclusive KB lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit recover_pending_journals was wired only into _kb_mutation_lock (the add path), so remove/recompile/lint/chat — which take kb_ingest_lock directly via _with_kb_lock — never drained. An `openkb add` that crashed mid-commit left an ACTIVE journal that an intervening `openkb remove` ignored and a later `openkb add` then rolled back, clobbering the remove's hashes.json edits and resurrecting the removed document. Move draining into kb_lock's first exclusive acquisition so every mutation entry point restores the KB to a known state before mutating. Delay-import mutation from locks to break the locks<->mutation cycle, and drop the now-redundant drain (plus its double-scan/double-log per file) from _kb_mutation_lock. Co-Authored-By: Claude --- openkb/cli.py | 19 +++++++++---------- openkb/locks.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index a0abe7b5..7abbfe2f 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -53,7 +53,7 @@ def filter(self, record: logging.LogRecord) -> bool: from openkb.converter import _registry_path, _sanitize_stem, convert_document, resolve_doc_name from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log -from openkb.mutation import MutationSnapshot, publish_staged_tree, recover_pending_journals, snapshot_paths +from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -714,19 +714,18 @@ def finish(outcome: _AddOutcome) -> _AddOutcome: @contextmanager def _kb_mutation_lock(kb_dir: Path): - """Acquire the ingest lock and drain pending mutation journals first. - - Draining recovery is part of taking the mutation lock: a process that - acquires it must restore the KB to a known state before mutating, so any - journal left by a prior interrupted run is rolled back (and a terminal - one discarded) before the caller runs. Recovery messages indicate a prior - interrupted run, so they surface at WARNING. + """Acquire the ingest lock for an add-path mutation. + + Journal draining lives in :func:`openkb.locks.kb_lock` now, so every + exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``, + ``chat`` — drains pending journals on first acquisition, not just the add + path. This wrapper remains the add pipeline's entry point (prepare/commit + run inside the lock); it no longer drains separately, since doing so would + double-scan and double-log on every per-file commit in a directory batch. """ started = time.perf_counter() with kb_ingest_lock(kb_dir / ".openkb"): _log_add_timing("lock_wait", None, started) - for message in recover_pending_journals(kb_dir): - logging.getLogger(__name__).warning(message) yield diff --git a/openkb/locks.py b/openkb/locks.py index 2fa2815a..72966fc1 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -9,6 +9,7 @@ import contextlib import json +import logging import os import tempfile import threading @@ -97,6 +98,32 @@ def _local_lock(lock_path: Path) -> _LocalRwLock: return lock +def _drain_pending_journals(openkb_dir: Path) -> None: + """Roll back any mutation journals an interrupted process left behind. + + Draining recovery is part of *taking* the mutation lock, not part of any + one command: a process that acquires the exclusive lock must restore the + KB to a known state before mutating it. Wiring this into ``kb_lock`` means + every exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``, + ``chat`` — drains on first acquisition, so an ``add`` that crashed mid- + commit cannot leave an active journal on disk that a later ``add`` rolls + back over the top of an intervening ``remove``/``recompile`` (clobbering + those edits). ``openkb_dir`` is the ``kb_dir/.openkb`` directory callers + pass to ``kb_lock``; the journal lives at ``openkb_dir/journal``, so the + KB root is ``openkb_dir.parent``. + + The delayed import breaks the ``locks`` ↔ ``mutation`` cycle (``mutation`` + imports atomic-write helpers from this module at top level). Called only + on first OS-lock acquisition (the reentrant branch above returns early), + never on a read lock, so queries pay nothing. + """ + from openkb.mutation import recover_pending_journals + + log = logging.getLogger(__name__) + for message in recover_pending_journals(openkb_dir.parent): + log.warning(message) + + @contextlib.contextmanager def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: """Hold a KB-level advisory lock.""" @@ -134,6 +161,8 @@ def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: flock(fh, exclusive=exclusive) held[resolved] = (1, 0) if exclusive else (0, 1) try: + if exclusive: + _drain_pending_journals(openkb_dir) yield finally: held.pop(resolved, None) From a48d0547c70a9f6d82cd455abe0e7beef360ab83 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Sat, 20 Jun 2026 00:41:56 +0800 Subject: [PATCH 7/7] test: cover lock-driven recovery and jobs>1 add pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - test_exclusive_lock_drains_active_journal_before_yielding: regression guard for the lock-level drain — an ACTIVE journal left by a crashed add is rolled back the moment any exclusive lock is taken, not only on the add path. - test_add_directory_jobs_gt1_runs_real_pipeline: end-to-end exercise of the jobs>1 ThreadPoolExecutor branch (real prepare + real commit, only LLM compile mocked). Every prior jobs>1 test mocked both halves, so futures ordering, staging publish, registry writes, and cleanup were never run. Co-Authored-By: Claude --- tests/test_add_command.py | 39 +++++++++++++++++++++++++++++++++++++++ tests/test_mutation.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 168fba73..f4f2f68c 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -657,3 +657,42 @@ def test_commit_conflict_guard_normalizes_unicode_filenames(self, tmp_path): # NFC-vs-NFD is the same document, not a conflict → ingest proceeds. assert outcome == "added" + + def test_add_directory_jobs_gt1_runs_real_pipeline(self, tmp_path): + """jobs>1 ThreadPoolExecutor 路径的端到端测试。 + + 其余 jobs>1 测试都 mock 了 _prepare_add_file 和 _commit_prepared_add, + 所以真正的并发分支——futures 按扫描顺序提交、_staging_dir_for 分配、 + prepared_outcomes.get(f) or futures[f].result() 回退、publish_staged_tree + 发布、registry 写入、staging 清理——从不被执行。这里用真实 prepare + 真实 + commit,只 mock LLM compile,让最复杂的新路径真正跑一遍。 + """ + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 3\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + for letter in ("a", "b", "c"): + (docs_dir / f"{letter}.md").write_text(f"# {letter}", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exception is None, result.output + assert result.output.count("[OK]") == 3 + hashes = json.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + assert len(hashes) == 3 + assert {meta["doc_name"] for meta in hashes.values()} == {"a", "b", "c"} + # Staging dirs cleaned up after each commit. + staging = kb_dir / ".openkb" / "staging" + if staging.exists(): + assert not any(p.name.startswith("add-") for p in staging.iterdir()) + # Source artifacts published from staging into the live KB. + for letter in ("a", "b", "c"): + assert (kb_dir / "wiki" / "sources" / f"{letter}.md").exists() diff --git a/tests/test_mutation.py b/tests/test_mutation.py index c08ff89f..1169c967 100644 --- a/tests/test_mutation.py +++ b/tests/test_mutation.py @@ -76,3 +76,36 @@ def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path): staging = kb_dir / ".openkb" / "staging" if staging.exists(): assert not any(staging.iterdir()) # no orphan rollback- dir + + +def test_exclusive_lock_drains_active_journal_before_yielding(tmp_path): + """Recovery runs on every exclusive-lock acquisition, not just the add path. + + ``recover_pending_journals`` is wired into ``kb_lock``'s first exclusive + acquisition, so any mutation command — ``remove``/``recompile``/``lint``/ + ``chat``, all of which take ``kb_ingest_lock`` directly — drains a crashed + predecessor's active journal before it mutates. This is the regression + guard for the bug where an ``add`` crash left an active journal that an + intervening ``remove`` ignored and a later ``add`` then rolled back over + the remove's edits. + """ + from openkb.locks import kb_ingest_lock + + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + # Simulate a crashed add: snapshot taken, file mutated, but mark_committed + # never ran — an ACTIVE journal is left on disk. + snapshot_paths(kb_dir, [target], operation="add", details={"doc_name": "doc"}) + target.write_text("after", encoding="utf-8") + + # Any exclusive-lock holder drains before its body runs. + with kb_ingest_lock(openkb_dir): + assert target.read_text(encoding="utf-8") == "before" + + assert target.read_text(encoding="utf-8") == "before" + assert not any((openkb_dir / "journal").glob("*.json"))