diff --git a/README.md b/README.md index 100c9fa6..d797cd33 100644 --- a/README.md +++ b/README.md @@ -333,6 +333,7 @@ OpenKB settings are initialized by `openkb init` and stored in `.openkb/config.y model: gpt-5.4 # LLM model (any LiteLLM-supported provider) language: en # Wiki output language pageindex_threshold: 20 # PDF pages threshold for PageIndex +file_processing_jobs: 2 # Files to prepare concurrently during `openkb add ` ``` Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/providers) (OpenAI models can omit the prefix): @@ -347,6 +348,8 @@ Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/p Advanced options (entity_types, extra_headers, OAuth):
+`file_processing_jobs` (default `2`): number of files prepared concurrently during `openkb add `. Only the preparation stage is parallelized (hashing, duplicate prefiltering, raw/source staging, conversion); live-KB mutation stays serialized under the mutation lock, so raising it helps mainly when conversion is the bottleneck. + `entity_types` (optional): a YAML list overriding the entity-type vocabulary used for entity pages; omit it to use the default `person`, `organization`, `place`, `product`, `work`, `event`, `other`. `extra_headers` (optional): a YAML mapping of extra HTTP headers sent with every LLM request (forwarded to LiteLLM's `extra_headers`). Useful for providers that expect custom headers, e.g. GitHub Copilot IDE-auth headers: diff --git a/config.yaml.example b/config.yaml.example index 45b4b8c3..aff75f0a 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -1,6 +1,10 @@ model: gpt-5.4 # LLM model (any LiteLLM-supported provider) language: en # Wiki output language pageindex_threshold: 20 # PDF pages threshold for PageIndex +file_processing_jobs: 2 # Number of files to prepare concurrently during `openkb add ` +# Note: this parallelizes hashing/conversion/staging only. Live KB publish, +# PageIndex indexing, LLM compilation, registry updates, and log writes remain +# serialized under the KB mutation lock. # Optional: extra HTTP headers sent with every LLM request (forwarded to # LiteLLM's extra_headers). Some providers need these — e.g. GitHub Copilot diff --git a/openkb/cli.py b/openkb/cli.py index b48ebc17..7abbfe2f 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -8,11 +8,16 @@ warnings.filterwarnings("ignore") import asyncio +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from dataclasses import dataclass import json import logging import shutil import sys import time +import unicodedata +import uuid from functools import wraps from pathlib import Path from typing import Literal @@ -45,9 +50,10 @@ def filter(self, record: logging.LogRecord) -> bool: DEFAULT_CONFIG, load_config, save_config, load_global_config, register_kb, resolve_extra_headers, set_extra_headers, ) -from openkb.converter import _registry_path, convert_document +from openkb.converter import _registry_path, _sanitize_stem, convert_document, resolve_doc_name from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log +from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -67,6 +73,7 @@ def filter(self, record: logging.LogRecord) -> bool: # handled by LiteLLM itself) — no API key env var is needed, so the # missing-key warning would be a false alarm for them. _OAUTH_PROVIDERS = {"chatgpt", "github_copilot"} +_AddOutcome = Literal["added", "skipped", "failed"] def _extract_provider(model: str) -> str | None: @@ -274,13 +281,468 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None: shutil.rmtree(target) -def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: - """Convert, index, and compile a single document under the KB mutation lock.""" +@dataclass +class _PreparedAdd: + file_path: Path + result: object | None = None + staging_dir: Path | None = None + outcome: _AddOutcome | None = None + error_stage: str = "" + error: Exception | None = None + + +def _positive_int(value: object, default: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + return default + return max(1, parsed) + + +def _log_add_timing( + stage: str, + file_path: Path | None, + started_at: float, + **fields: object, +) -> None: + logger = logging.getLogger(__name__) + if not logger.isEnabledFor(logging.DEBUG): + return + elapsed = time.perf_counter() - started_at + subject = file_path.name if file_path is not None else "" + suffix = "".join(f" {key}={value}" for key, value in fields.items()) + logger.debug("add %s for %s took %.3fs%s", stage, subject, elapsed, suffix) + + +def _prefilter_known_files( + files: list[Path], + kb_dir: Path, + jobs: int, +) -> tuple[list[Path], dict[Path, _PreparedAdd]]: + """Hash directory inputs before conversion and skip hashes already known.""" + from openkb.state import HashRegistry + + started = time.perf_counter() + with _kb_mutation_lock(kb_dir): + registry = HashRegistry.memory( + HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() + ) + + def hash_one(file_path: Path) -> tuple[Path, str | None, Exception | None]: + try: + return file_path, HashRegistry.hash_file(file_path), None + except Exception as exc: + return file_path, None, exc + + if jobs == 1: + hashed = [hash_one(file_path) for file_path in files] + else: + with ThreadPoolExecutor(max_workers=jobs) as executor: + futures = [executor.submit(hash_one, file_path) for file_path in files] + hashed = [future.result() for future in futures] + + remaining: list[Path] = [] + prepared: dict[Path, _PreparedAdd] = {} + for file_path, file_hash, error in hashed: + if error is not None: + prepared[file_path] = _PreparedAdd( + file_path=file_path, + outcome="failed", + error_stage="Hash", + error=error, + ) + continue + if file_hash is not None and registry.is_known(file_hash): + prepared[file_path] = _PreparedAdd( + file_path=file_path, + outcome="skipped", + ) + continue + remaining.append(file_path) + + _log_add_timing( + "prefilter", + None, + started, + total=len(files), + remaining=len(remaining), + skipped=len(prepared), + jobs=jobs, + ) + return remaining, prepared + + +def _reserve_batch_doc_names(files: list[Path], kb_dir: Path) -> dict[Path, str]: + """Reserve doc_names in scan order so parallel prepare matches serial add.""" + from openkb.state import HashRegistry + + # In-memory registry view: same resolve contract as the on-disk registry + # but add() never persists. persist_legacy=True below therefore only + # *consumes* a matched legacy entry in memory (backfilling its path) so a + # later same-stem file in this batch no longer re-matches it via + # find_legacy_by_stem — the idempotency fix. With a persisting registry + # that same call would write disk on every reservation. + batch_registry = HashRegistry.memory( + HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() + ) + reserved: dict[Path, str] = {} + for file_path in files: + doc_name = resolve_doc_name( + file_path, + kb_dir, + batch_registry, + persist_legacy=True, + ) + reserved[file_path] = doc_name + # Future files in the same batch must see this name as already taken; + # otherwise same-stem files prepared in parallel could all claim it. + batch_registry.add( + f"batch:{len(reserved)}:{_registry_path(file_path, kb_dir)}", + { + "name": file_path.name, + "doc_name": doc_name, + "path": _registry_path(file_path, kb_dir), + }, + ) + return reserved + + +def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path: + safe = _sanitize_stem(file_path.stem) + # uuid (not time.time_ns()) for uniqueness: two same-stem files sanitize to + # the same `safe`, so the timestamp was the only differentiator and a + # repeated wall-clock ns would make mkdir(exist_ok=False) crash the batch. + path = kb_dir / ".openkb" / "staging" / f"add-{safe}-{uuid.uuid4().hex[:8]}" + path.mkdir(parents=True, exist_ok=False) + return path + + +def _cleanup_staging(path: Path | None) -> None: + if path is not None: + shutil.rmtree(path, ignore_errors=True) + + +def _prepare_add_file( + file_path: Path, + kb_dir: Path, + staging_dir: Path | None, + doc_name: str | None = None, +) -> _PreparedAdd: + """Run file-local conversion into an optional staging directory.""" + logger = logging.getLogger(__name__) + started = time.perf_counter() + try: + result = convert_document( + file_path, + kb_dir, + assume_locked=True, + staging_dir=staging_dir, + doc_name_override=doc_name, + ) + except Exception as exc: + logger.debug("Conversion traceback:", exc_info=True) + _log_add_timing("prepare", file_path, started, outcome="failed") + return _PreparedAdd( + file_path=file_path, + staging_dir=staging_dir, + outcome="failed", + error_stage="Conversion", + error=exc, + ) + if result.skipped: + prepared = _PreparedAdd( + file_path=file_path, + result=result, + staging_dir=staging_dir, + outcome="skipped", + ) + _log_add_timing("prepare", file_path, started, outcome="skipped") + return prepared + prepared = _PreparedAdd(file_path=file_path, result=result, staging_dir=staging_dir) + _log_add_timing("prepare", file_path, started, outcome="prepared") + return prepared + + +def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: + final_raw = None + final_source = None + if result.raw_path is not None: + final_raw = kb_dir / "raw" / result.raw_path.name + if result.source_path is not None: + final_source = kb_dir / "wiki" / "sources" / result.source_path.name + return final_raw, final_source + + +def _snapshot_add_paths(kb_dir: Path, doc_name: str, final_raw: Path | None, final_source: Path | None) -> list[Path]: + paths = [ + kb_dir / ".openkb" / "hashes.json", + kb_dir / ".openkb" / "pageindex.db", + # SQLite may keep recently-written PageIndex state in sidecar files. + # Snapshot missing paths too, so rollback removes sidecars created + # during a failed long-document ingest. + kb_dir / ".openkb" / "pageindex.db-wal", + kb_dir / ".openkb" / "pageindex.db-shm", + kb_dir / ".openkb" / "pageindex.db-journal", + kb_dir / ".openkb" / "files", + kb_dir / "wiki" / "summaries" / f"{doc_name}.md", + kb_dir / "wiki" / "sources" / f"{doc_name}.json", + kb_dir / "wiki" / "sources" / "images" / doc_name, + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / "wiki" / "index.md", + kb_dir / "wiki" / "log.md", + ] + if final_raw is not None: + paths.append(final_raw) + if final_source is not None: + paths.append(final_source) + return paths + + +def _run_compile_with_retry(coro_factory, label: str) -> None: + """Run an async compile coroutine once, retrying once after 2s on failure. + + ``coro_factory`` builds a fresh coroutine each call (an ``asyncio.run``-consumed + coroutine can't be awaited twice). Raises on the second failure so the caller's + snapshot-rollback path runs. + """ + logger = logging.getLogger(__name__) + click.echo(f" {label}...") + started = time.perf_counter() + for attempt in range(2): + try: + asyncio.run(coro_factory()) + _log_add_timing("compile", None, started, label=label) + return + except Exception as exc: + if attempt == 0: + click.echo(" Retrying compilation in 2s...") + time.sleep(2) + else: + click.echo(f" [ERROR] Compilation failed: {exc}") + logger.debug("Compilation traceback:", exc_info=True) + raise + + +def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _AddOutcome: + """Commit a prepared add while the caller holds the KB mutation lock.""" + from openkb.agent.compiler import compile_long_doc, compile_short_doc + from openkb.state import HashRegistry + + logger = logging.getLogger(__name__) + file_path = prepared.file_path + started = time.perf_counter() + + def finish(outcome: _AddOutcome) -> _AddOutcome: + _log_add_timing("commit", file_path, started, outcome=outcome) + return outcome + + if prepared.outcome == "failed": + click.echo(f" [ERROR] {prepared.error_stage} failed: {prepared.error}") + _cleanup_staging(prepared.staging_dir) + return finish("failed") + if prepared.outcome == "skipped": + click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") + _cleanup_staging(prepared.staging_dir) + return finish("skipped") + if prepared.result is None: + click.echo(f" [ERROR] Conversion failed: no result for {file_path.name}") + _cleanup_staging(prepared.staging_dir) + return finish("failed") + + result = prepared.result + registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") + if result.file_hash and registry.is_known(result.file_hash): + click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") + _cleanup_staging(prepared.staging_dir) + return finish("skipped") + + doc_name = result.doc_name or file_path.stem + norm_doc_name = unicodedata.normalize("NFKC", doc_name) + norm_file_name = unicodedata.normalize("NFKC", file_path.name) + path_key = _registry_path(file_path, kb_dir) + for existing_hash, meta in registry.all_entries().items(): + existing_name = meta.get("doc_name") or Path(meta.get("name", "")).stem + if ( + existing_hash != result.file_hash + and unicodedata.normalize("NFKC", existing_name) == norm_doc_name + ): + existing_path = meta.get("path") + # Same document iff it shares our path, OR it is a legacy + # (pre-path-index) entry with no path whose filename matches — the + # pre-collision form of this same document being re-ingested. A + # path-indexed entry must NOT be matched by filename alone: two + # different files can share a name, and a concurrent add could + # claim this doc_name in the window between reservation and commit. + # NFKC-normalize names: macOS reports filenames in NFD, so a raw + # `==` would mis-classify a same-document re-add as a conflict. + if existing_path == path_key or ( + not existing_path + and unicodedata.normalize("NFKC", meta.get("name") or "") == norm_file_name + ): + continue + click.echo( + " [ERROR] Document name conflict after parallel preparation: " + f"'{doc_name}' is already used by {meta.get('name', 'another document')}. " + "Re-run this file after the current batch." + ) + _cleanup_staging(prepared.staging_dir) + return finish("failed") + + final_raw, final_source = _final_artifact_paths(result, kb_dir) + snapshot: MutationSnapshot | None = None + index_result = None + + try: + # Take the snapshot inside the try: a failure here (disk-full copytree + # of concepts/entities, permission, relative_to ValueError) used to + # propagate uncaught — aborting the whole remaining batch and leaking + # an unjournaled backup dir. snapshot_paths now self-cleans its backup + # dir on partial failure, so when snapshot stays None there is nothing + # to roll back and the except branch handles that case. + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), + operation="add", + details={"file_hash": result.file_hash, "name": file_path.name, "doc_name": doc_name}, + ) + publish_staged_tree(prepared.staging_dir, kb_dir) + if final_raw is not None: + result.raw_path = final_raw + if final_source is not None: + result.source_path = final_source + + if result.is_long_doc: + click.echo(" Long document detected — indexing with PageIndex...") + index_started = time.perf_counter() + try: + from openkb.indexer import index_long_document + + index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) + _log_add_timing("index", file_path, index_started, doc_name=doc_name) + except Exception as exc: + click.echo(f" [ERROR] Indexing failed: {exc}") + logger.debug("Indexing traceback:", exc_info=True) + raise + + summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + index_result.doc_id, + kb_dir, + model, + doc_description=index_result.description, + ), + label=f"Compiling long doc (doc_id={index_result.doc_id})", + ) + else: + _run_compile_with_retry( + lambda: compile_short_doc(doc_name, result.source_path, kb_dir, model), + label="Compiling short doc", + ) + + if result.file_hash: + # Reuse the conflict-scan registry (line above): still valid under + # the same lock — nothing mutated it between scan and write. + doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") + meta = { + "name": file_path.name, + "doc_name": doc_name, + "type": doc_type, + "path": _registry_path(file_path, kb_dir), + } + if result.raw_path is not None: + meta["raw_path"] = _registry_path(result.raw_path, kb_dir) + if result.source_path is not None: + meta["source_path"] = _registry_path(result.source_path, kb_dir) + if index_result is not None: + meta["doc_id"] = index_result.doc_id + registry.remove_by_doc_name(doc_name) + for existing_hash, existing_meta in list(registry.all_entries().items()): + if ( + existing_hash != result.file_hash + and not existing_meta.get("doc_name") + and existing_meta.get("name") == file_path.name + ): + registry.remove_by_hash(existing_hash) + registry.add(result.file_hash, meta) + + # Commit point: the registry write is durable. Mark the journal + # committed so that any failure in the post-commit cleanup below + # (log append, backup removal) cannot trigger a recovery rollback + # that discards this completed ingest. mark_committed is the last + # step inside the try — together with registry.add it forms the + # atomic commit group, so a failure here correctly rolls back. + snapshot.mark_committed() + except Exception: + if snapshot is None: + # snapshot_paths failed before any mutation ran; it already removed + # its own backup dir, so there is nothing to roll back. + click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") + return finish("failed") + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + return finish("failed") + finally: + _cleanup_staging(prepared.staging_dir) + + # Post-commit side effects. These run only after the mutation is + # committed and the journal marked committed, so a failure here must NOT + # roll back — best-effort only. A stale "committed" journal left behind + # is harmless: the next recover_pending_journals sees status "committed" + # and discards it. + try: + append_log(kb_dir / "wiki", "ingest", file_path.name) + except Exception as exc: + logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc) + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo( + f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" + ) + click.echo(f" [OK] {file_path.name} added to knowledge base.") + return finish("added") + + +@contextmanager +def _kb_mutation_lock(kb_dir: Path): + """Acquire the ingest lock for an add-path mutation. + + Journal draining lives in :func:`openkb.locks.kb_lock` now, so every + exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``, + ``chat`` — drains pending journals on first acquisition, not just the add + path. This wrapper remains the add pipeline's entry point (prepare/commit + run inside the lock); it no longer drains separately, since doing so would + double-scan and double-log on every per-file commit in a directory batch. + """ + started = time.perf_counter() with kb_ingest_lock(kb_dir / ".openkb"): - return _add_single_file_locked(file_path, kb_dir) + _log_add_timing("lock_wait", None, started) + yield + +def add_single_file(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome: + """Convert, index, and compile a single document under the KB mutation lock. + + ``stage=True`` converts into an isolated staging dir before the commit + snapshot, so a crash between convert and commit can't orphan raw/source + files in the live KB. Callers whose file already lives in ``raw/`` (watch + mode, URL fetch) pass ``stage=False`` to keep convert's in-place + optimization. + """ + with _kb_mutation_lock(kb_dir): + return _add_single_file_locked(file_path, kb_dir, stage=stage) -def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: + +def _add_single_file_locked(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome: """Convert, index, and compile a single document into the knowledge base. Steps: @@ -297,106 +759,19 @@ def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", " be an orphan) while preserving it on failure so the user can retry without re-downloading. """ - from openkb.agent.compiler import compile_long_doc, compile_short_doc - from openkb.state import HashRegistry - - logger = logging.getLogger(__name__) openkb_dir = kb_dir / ".openkb" config = load_config(openkb_dir / "config.yaml") _setup_llm_key(kb_dir) model: str = config.get("model", DEFAULT_CONFIG["model"]) - # 2. Convert document click.echo(f"Adding: {file_path.name}") - try: - result = convert_document(file_path, kb_dir) - except Exception as exc: - click.echo(f" [ERROR] Conversion failed: {exc}") - logger.debug("Conversion traceback:", exc_info=True) - return "failed" - - if result.skipped: - click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") - return "skipped" - - doc_name = result.doc_name or file_path.stem - index_result = None # populated only on the long-doc branch - - # 3/4. Index and compile - if result.is_long_doc: - click.echo(f" Long document detected — indexing with PageIndex...") - try: - from openkb.indexer import index_long_document - index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) - except Exception as exc: - click.echo(f" [ERROR] Indexing failed: {exc}") - logger.debug("Indexing traceback:", exc_info=True) - return "failed" - - summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...") - for attempt in range(2): - try: - asyncio.run( - compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model, - doc_description=index_result.description) - ) - break - except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - else: - click.echo(f" Compiling short doc...") - for attempt in range(2): - try: - asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model)) - break - except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - - # Register hash only after successful compilation - if result.file_hash: - # Construct the registry NOW, not earlier: convert_document may have - # backfilled a legacy entry (doc_name/path) on disk via its own - # instance, and an earlier snapshot would clobber that backfill on - # the full rewrite in add(). - registry = HashRegistry(openkb_dir / "hashes.json") - doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") - meta = { - "name": file_path.name, - "doc_name": doc_name, - "type": doc_type, - "path": _registry_path(file_path, kb_dir), - } - if result.raw_path is not None: - meta["raw_path"] = _registry_path(result.raw_path, kb_dir) - if result.source_path is not None: - meta["source_path"] = _registry_path(result.source_path, kb_dir) - # For long PDFs we also persist the PageIndex doc_id so `openkb - # remove` can later call ``Collection.delete_document(doc_id)`` - # to free the managed PDF copy + SQLite row. - if index_result is not None: - meta["doc_id"] = index_result.doc_id - # An edited document arrives with a new content hash; drop the - # stale entry for the same doc_name so the registry keeps exactly - # one entry per document. - registry.remove_by_doc_name(doc_name) - registry.add(result.file_hash, meta) - - append_log(kb_dir / "wiki", "ingest", file_path.name) - click.echo(f" [OK] {file_path.name} added to knowledge base.") - return "added" + # Stage unless the file already lives in raw/ (watch/URL), where convert's + # in-place optimization applies. Staging keeps convert's writes out of the + # live KB so a crash between convert and the commit snapshot can't orphan + # raw/source files with no journal to recover them. + staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None + prepared = _prepare_add_file(file_path, kb_dir, staging_dir=staging_dir) + return _commit_prepared_add(prepared, kb_dir, model) # --------------------------------------------------------------------------- @@ -602,6 +977,7 @@ def init(model, language): "model": model, "language": language, "pageindex_threshold": DEFAULT_CONFIG["pageindex_threshold"], + "file_processing_jobs": DEFAULT_CONFIG["file_processing_jobs"], } save_config(openkb_dir / "config.yaml", config) atomic_write_json(openkb_dir / "hashes.json", {}) @@ -625,7 +1001,6 @@ def init(model, language): @cli.command() @click.argument("path") @click.pass_context -@_with_kb_lock(exclusive=True) def add(ctx, path): """Add a document or directory of documents at PATH to the knowledge base. @@ -647,10 +1022,11 @@ def add(ctx, path): # that the registry can't reach via openkb remove. from openkb.url_ingest import looks_like_url, fetch_url_to_raw if looks_like_url(path): - fetched = fetch_url_to_raw(path, kb_dir) - if fetched is None: - return - outcome = add_single_file(fetched, kb_dir) + with _kb_mutation_lock(kb_dir): + fetched = fetch_url_to_raw(path, kb_dir) + if fetched is None: + return + outcome = _add_single_file_locked(fetched, kb_dir, stage=False) # Only clean up on dedup-skip. On "failed" we keep the file so # the user can retry (e.g. transient LLM error during compile) # without re-downloading — and so they don't lose data when @@ -674,9 +1050,61 @@ def add(ctx, path): return total = len(files) click.echo(f"Found {total} supported file(s) in {path}.") - for i, f in enumerate(files, 1): - click.echo(f"\n[{i}/{total}] ", nl=False) - add_single_file(f, kb_dir) + config = load_config(kb_dir / ".openkb" / "config.yaml") + jobs = min( + total, + _positive_int( + config.get("file_processing_jobs"), + DEFAULT_CONFIG["file_processing_jobs"], + ), + ) + _setup_llm_key(kb_dir) + model = config.get("model", DEFAULT_CONFIG["model"]) + files_to_prepare, prepared_outcomes = _prefilter_known_files(files, kb_dir, jobs) + with _kb_mutation_lock(kb_dir): + reserved_doc_names = _reserve_batch_doc_names(files_to_prepare, kb_dir) + if jobs == 1: + for i, f in enumerate(files, 1): + if f in prepared_outcomes: + prepared = prepared_outcomes[f] + click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}") + else: + click.echo(f"\n[{i}/{total}] ", nl=False) + # Stage exactly like the multi-worker path: prepare must never + # write the live KB unlocked. staging_dir=None would make + # convert_document write raw/ and wiki/sources/ straight into + # kb_dir with no lock and no mutation journal (orphan on crash). + prepared = _prepare_add_file( + f, + kb_dir, + staging_dir=_staging_dir_for(kb_dir, f), + doc_name=reserved_doc_names[f], + ) + with _kb_mutation_lock(kb_dir): + _commit_prepared_add(prepared, kb_dir, model) + return + + click.echo(f"Preparing files with {jobs} worker(s).") + with ThreadPoolExecutor(max_workers=jobs) as executor: + futures = { + f: executor.submit( + _prepare_add_file, + f, + kb_dir, + _staging_dir_for(kb_dir, f), + reserved_doc_names[f], + ) + for f in files_to_prepare + } + # Commit in scan order even though prepare finishes out of order. + # That keeps log.md and CLI progress stable for human audit. + for i, f in enumerate(files, 1): + prepared = prepared_outcomes.get(f) + if prepared is None: + prepared = futures[f].result() + click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}") + with _kb_mutation_lock(kb_dir): + _commit_prepared_add(prepared, kb_dir, model) else: if target.suffix.lower() not in SUPPORTED_EXTENSIONS: click.echo( @@ -1412,7 +1840,7 @@ def on_new_files(paths): f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}" ) continue - add_single_file(fp, kb_dir) + add_single_file(fp, kb_dir, stage=False) click.echo(f"Watching {raw_dir} for new documents. Press Ctrl+C to stop.") watch_directory(raw_dir, on_new_files) diff --git a/openkb/config.py b/openkb/config.py index 9d0d6cd4..4c44addd 100644 --- a/openkb/config.py +++ b/openkb/config.py @@ -16,6 +16,7 @@ "model": "gpt-5.4-mini", "language": "en", "pageindex_threshold": 20, + "file_processing_jobs": 2, } # Default entity-type vocabulary. Overridable per-KB via the optional diff --git a/openkb/converter.py b/openkb/converter.py index 9ebac762..b5c2add6 100644 --- a/openkb/converter.py +++ b/openkb/converter.py @@ -14,6 +14,7 @@ from openkb.config import load_config from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images +from openkb.locks import atomic_write_text, kb_ingest_lock from openkb.state import HashRegistry logger = logging.getLogger(__name__) @@ -29,6 +30,7 @@ class ConvertResult: skipped: bool = False file_hash: str | None = None # For deferred hash registration doc_name: str | None = None # Stable wiki name (collision-resistant) + staging_dir: Path | None = None def _registry_path(path: Path, kb_dir: Path) -> str: @@ -70,7 +72,13 @@ def _name_taken(candidate: str, registry: HashRegistry) -> bool: return False -def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: +def resolve_doc_name( + src: Path, + kb_dir: Path, + registry: HashRegistry, + *, + persist_legacy: bool = True, +) -> str: """Resolve the stable wiki name for ``src`` (Scheme A). Identity is keyed by path: a source we've seen before (same path, even @@ -93,9 +101,10 @@ def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: file_hash, meta = legacy meta = dict(meta) name = meta.get("doc_name") or Path(meta.get("name", "")).stem - meta["doc_name"] = name - meta["path"] = path_key - registry.add(file_hash, meta) # backfill + persist + if persist_legacy: + meta["doc_name"] = name + meta["path"] = path_key + registry.add(file_hash, meta) # backfill + persist return name candidate = _sanitize_stem(src.stem) @@ -111,7 +120,14 @@ def get_pdf_page_count(path: Path) -> int: return doc.page_count -def convert_document(src: Path, kb_dir: Path) -> ConvertResult: +def convert_document( + src: Path, + kb_dir: Path, + *, + assume_locked: bool = False, + staging_dir: Path | None = None, + doc_name_override: str | None = None, +) -> ConvertResult: """Convert a document and integrate it into the knowledge base. Steps: @@ -122,12 +138,23 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: 5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``. 6. Register hash in the registry. """ + if not assume_locked: + with kb_ingest_lock(kb_dir / ".openkb"): + return convert_document( + src, + kb_dir, + assume_locked=True, + staging_dir=staging_dir, + doc_name_override=doc_name_override, + ) + # ------------------------------------------------------------------ # Load config & state # ------------------------------------------------------------------ openkb_dir = kb_dir / ".openkb" config = load_config(openkb_dir / "config.yaml") threshold: int = config.get("pageindex_threshold", 20) + artifact_root = staging_dir if staging_dir is not None else kb_dir registry = HashRegistry(openkb_dir / "hashes.json") # ------------------------------------------------------------------ @@ -141,15 +168,23 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: skipped=True, file_hash=file_hash, doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, + staging_dir=staging_dir, ) - doc_name = resolve_doc_name(src, kb_dir, registry) + # Batch ingest reserves doc_names before parallel conversion so same-stem + # files behave like serial adds while still writing to isolated staging dirs. + doc_name = doc_name_override or resolve_doc_name( + src, + kb_dir, + registry, + persist_legacy=staging_dir is None, + ) # ------------------------------------------------------------------ # 2. Copy to raw/ # ------------------------------------------------------------------ - raw_dir = kb_dir / "raw" + raw_dir = artifact_root / "raw" raw_dir.mkdir(parents=True, exist_ok=True) - if src.resolve().is_relative_to(raw_dir.resolve()): + if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()): # Watch mode: the file already lives in raw/ — don't copy/rename. raw_dest = src else: @@ -173,14 +208,15 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: is_long_doc=True, file_hash=file_hash, doc_name=doc_name, + staging_dir=staging_dir, ) # ------------------------------------------------------------------ # 4/5. Convert to Markdown # ------------------------------------------------------------------ - sources_dir = kb_dir / "wiki" / "sources" + sources_dir = artifact_root / "wiki" / "sources" sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = kb_dir / "wiki" / "sources" / "images" / doc_name + images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name images_dir.mkdir(parents=True, exist_ok=True) if src.suffix.lower() == ".md": @@ -197,11 +233,12 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: markdown = extract_base64_images(markdown, doc_name, images_dir) dest_md = sources_dir / f"{doc_name}.md" - dest_md.write_text(markdown, encoding="utf-8") + atomic_write_text(dest_md, markdown) return ConvertResult( raw_path=raw_dest, source_path=dest_md, file_hash=file_hash, doc_name=doc_name, + staging_dir=staging_dir, ) diff --git a/openkb/locks.py b/openkb/locks.py index 2fa2815a..72966fc1 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -9,6 +9,7 @@ import contextlib import json +import logging import os import tempfile import threading @@ -97,6 +98,32 @@ def _local_lock(lock_path: Path) -> _LocalRwLock: return lock +def _drain_pending_journals(openkb_dir: Path) -> None: + """Roll back any mutation journals an interrupted process left behind. + + Draining recovery is part of *taking* the mutation lock, not part of any + one command: a process that acquires the exclusive lock must restore the + KB to a known state before mutating it. Wiring this into ``kb_lock`` means + every exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``, + ``chat`` — drains on first acquisition, so an ``add`` that crashed mid- + commit cannot leave an active journal on disk that a later ``add`` rolls + back over the top of an intervening ``remove``/``recompile`` (clobbering + those edits). ``openkb_dir`` is the ``kb_dir/.openkb`` directory callers + pass to ``kb_lock``; the journal lives at ``openkb_dir/journal``, so the + KB root is ``openkb_dir.parent``. + + The delayed import breaks the ``locks`` ↔ ``mutation`` cycle (``mutation`` + imports atomic-write helpers from this module at top level). Called only + on first OS-lock acquisition (the reentrant branch above returns early), + never on a read lock, so queries pay nothing. + """ + from openkb.mutation import recover_pending_journals + + log = logging.getLogger(__name__) + for message in recover_pending_journals(openkb_dir.parent): + log.warning(message) + + @contextlib.contextmanager def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: """Hold a KB-level advisory lock.""" @@ -134,6 +161,8 @@ def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: flock(fh, exclusive=exclusive) held[resolved] = (1, 0) if exclusive else (0, 1) try: + if exclusive: + _drain_pending_journals(openkb_dir) yield finally: held.pop(resolved, None) diff --git a/openkb/mutation.py b/openkb/mutation.py new file mode 100644 index 00000000..cdac260b --- /dev/null +++ b/openkb/mutation.py @@ -0,0 +1,216 @@ +"""Transactional helpers for KB mutation paths.""" +from __future__ import annotations + +import json +import logging +import shutil +import uuid +from dataclasses import dataclass, field +from pathlib import Path + +from openkb.locks import atomic_write_bytes, atomic_write_json + +logger = logging.getLogger(__name__) + + +def _copy_file_atomic(src: Path, dest: Path) -> None: + dest.parent.mkdir(parents=True, exist_ok=True) + atomic_write_bytes(dest, src.read_bytes()) + + +@dataclass +class MutationSnapshot: + """Snapshot of final KB paths touched by a mutation attempt.""" + + kb_dir: Path + backup_dir: Path + journal_path: Path + operation: str + details: dict = field(default_factory=dict) + entries: dict[Path, Path | None] = field(default_factory=dict) + + def _journal_data(self, status: str) -> dict: + return { + "version": 1, + "operation": self.operation, + "status": status, + "kb_dir": str(self.kb_dir), + "backup_dir": str(self.backup_dir), + "details": self.details, + "entries": [ + { + "target": str(target), + "backup": str(backup) if backup is not None else None, + } + for target, backup in self.entries.items() + ], + } + + def write_journal(self, status: str) -> None: + self.journal_path.parent.mkdir(parents=True, exist_ok=True) + atomic_write_json(self.journal_path, self._journal_data(status)) + + def mark_committed(self) -> None: + """Mark the journal committed without removing the backup. + + Call this the instant the mutation is durably applied (e.g. the + registry write has landed) so a subsequent + :func:`recover_pending_journals` discards the journal instead of + rolling it back. This is the commit signal; :meth:`discard` is the + post-commit cleanup that also removes the backup dir and journal + file and must itself be best-effort — it runs *after* the commit + point and its failure must never trigger a rollback. + """ + self.write_journal("committed") + + def rollback(self) -> None: + # Restore children before parents so directory deletes cannot remove + # paths that still need to be restored from a more specific backup. + for target, backup in sorted( + self.entries.items(), + key=lambda item: len(item[0].parts), + reverse=True, + ): + # Removal is unconditional; the backup (if any) is then restored + # in its place. + if target.is_dir(): + shutil.rmtree(target, ignore_errors=True) + else: + target.unlink(missing_ok=True) + if backup is None: + continue + target.parent.mkdir(parents=True, exist_ok=True) + if backup.is_dir(): + shutil.copytree(backup, target) + else: + _copy_file_atomic(backup, target) + self.write_journal("rolled_back") + + def rollback_best_effort(self) -> Exception | None: + try: + self.rollback() + except Exception as exc: + logger.warning("Mutation rollback failed: %s", exc) + return exc + return None + + def discard(self) -> None: + # Best-effort post-commit/post-rollback cleanup: callers have already + # written a terminal status (mark_committed or rollback), so there is + # nothing to re-write here — doing so would be dead work and would + # silently downgrade a "rolled_back" journal to "committed" moments + # before it is deleted. + shutil.rmtree(self.backup_dir, ignore_errors=True) + self.journal_path.unlink(missing_ok=True) + + def discard_best_effort(self) -> Exception | None: + try: + self.discard() + except Exception as exc: + logger.warning("Mutation journal cleanup failed: %s", exc) + return exc + return None + + +def snapshot_paths( + kb_dir: Path, + paths: list[Path], + *, + operation: str, + details: dict | None = None, +) -> MutationSnapshot: + """Snapshot final KB paths before a mutation starts.""" + kb_dir = kb_dir.resolve() + journal_id = uuid.uuid4().hex + backup_dir = kb_dir / ".openkb" / "staging" / f"rollback-{journal_id}" + backup_dir.mkdir(parents=True, exist_ok=False) + snapshot = MutationSnapshot( + kb_dir=kb_dir, + backup_dir=backup_dir, + journal_path=kb_dir / ".openkb" / "journal" / f"{journal_id}.json", + operation=operation, + details=details or {}, + ) + try: + for path in paths: + target = path.resolve() + if target in snapshot.entries: + continue + if not target.exists(): + snapshot.entries[target] = None + continue + rel = target.relative_to(kb_dir) + backup = backup_dir / rel + backup.parent.mkdir(parents=True, exist_ok=True) + if target.is_dir(): + shutil.copytree(target, backup) + else: + shutil.copy2(target, backup) + snapshot.entries[target] = backup + # The active journal is the recovery signal: once this exists, a future + # process can restore every recorded target even if the current one exits. + snapshot.write_journal("active") + except Exception: + # Partial snapshot: backup_dir exists on disk but no journal was + # written. recover_pending_journals only scans journals, so remove the + # orphan backup here — otherwise it leaks forever with nothing able to + # reach or clean it. + shutil.rmtree(backup_dir, ignore_errors=True) + raise + return snapshot + + +def _snapshot_from_journal(path: Path, data: dict) -> MutationSnapshot: + snapshot = MutationSnapshot( + kb_dir=Path(data["kb_dir"]), + backup_dir=Path(data["backup_dir"]), + journal_path=path, + operation=data.get("operation", "mutation"), + details=data.get("details") or {}, + ) + snapshot.entries = { + Path(item["target"]): Path(item["backup"]) if item.get("backup") else None + for item in data.get("entries", []) + } + return snapshot + + +def recover_pending_journals(kb_dir: Path) -> list[str]: + """Rollback active journals left by an interrupted process.""" + journal_dir = kb_dir / ".openkb" / "journal" + if not journal_dir.is_dir(): + return [] + messages: list[str] = [] + for journal_path in sorted(journal_dir.glob("*.json")): + try: + data = json.loads(journal_path.read_text(encoding="utf-8")) + snapshot = _snapshot_from_journal(journal_path, data) + status = data.get("status", "active") + if status in {"committed", "rolled_back"}: + snapshot.discard() + messages.append(f"Cleaned terminal mutation journal {journal_path.name}.") + continue + snapshot.rollback() + snapshot.discard() + messages.append( + f"Rolled back interrupted {snapshot.operation} journal {journal_path.name}." + ) + except Exception as exc: + messages.append( + f"Could not recover journal {journal_path.name}: {type(exc).__name__}: {exc}" + ) + return messages + + +def publish_staged_tree(staging_dir: Path | None, kb_dir: Path) -> None: + """Copy staged raw/source artifacts into their final KB locations.""" + if staging_dir is None or not staging_dir.exists(): + return + for rel in ("raw", "wiki/sources"): + src_root = staging_dir / rel + if not src_root.exists(): + continue + for src in src_root.rglob("*"): + if not src.is_file(): + continue + _copy_file_atomic(src, kb_dir / rel / src.relative_to(src_root)) diff --git a/openkb/state.py b/openkb/state.py index 10cb20cc..c7302770 100644 --- a/openkb/state.py +++ b/openkb/state.py @@ -13,12 +13,29 @@ class HashRegistry: def __init__(self, path: Path) -> None: self._path = path + self._persist_enabled = True if path.exists(): with path.open("r", encoding="utf-8") as fh: self._data: dict[str, dict] = json.load(fh) else: self._data = {} + @classmethod + def memory(cls, entries: dict[str, dict]) -> "HashRegistry": + """An in-memory view over ``entries`` that never writes back to disk. + + Shares the on-disk registry's read/resolve contract (``get_by_path``, + ``find_legacy_by_stem``, ``all_entries``, ``add``) so callers that only + need to stage mutations in memory — batch doc_name reservation — + reuse the same code path instead of a parallel reimplementation. + ``add`` updates the in-memory dict but skips persistence. + """ + reg = cls.__new__(cls) + reg._path = None + reg._persist_enabled = False + reg._data = {key: dict(value) for key, value in entries.items()} + return reg + # ------------------------------------------------------------------ # Query helpers # ------------------------------------------------------------------ @@ -115,6 +132,8 @@ def remove_by_hash(self, file_hash: str) -> bool: # ------------------------------------------------------------------ def _persist(self) -> None: + if not self._persist_enabled: + return atomic_write_json(self._path, self._data) # ------------------------------------------------------------------ diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 0199c9e2..f4f2f68c 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -80,17 +80,67 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path): (docs_dir / "b.txt").write_text("B content") (docs_dir / "ignore.xyz").write_text("skip me") + from openkb.cli import _PreparedAdd + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir) + runner = CliRunner() - with patch("openkb.cli.add_single_file") as mock_add, \ + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \ + patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \ patch("openkb.cli._find_kb_dir", return_value=kb_dir): runner.invoke(cli, ["add", str(docs_dir)]) - # Should be called for .md and .txt but not .xyz - assert mock_add.call_count == 2 - called_names = {call.args[0].name for call in mock_add.call_args_list} + # Should be prepared/committed for .md and .txt but not .xyz + assert mock_prepare.call_count == 2 + assert mock_commit.call_count == 2 + called_names = {call.args[0].name for call in mock_prepare.call_args_list} assert "a.md" in called_names assert "b.txt" in called_names assert "ignore.xyz" not in called_names + def test_add_directory_prefilters_known_hashes_before_prepare(self, tmp_path): + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + known = docs_dir / "known.md" + unknown = docs_dir / "unknown.md" + known.write_text("# Known", encoding="utf-8") + unknown.write_text("# Unknown", encoding="utf-8") + + from openkb.cli import _PreparedAdd + from openkb.state import HashRegistry + + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + HashRegistry.hash_file(known), + { + "name": "known.md", + "doc_name": "known", + "type": "md", + "path": "docs/known.md", + }, + ) + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir) + + runner = CliRunner() + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \ + patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \ + patch("openkb.cli._find_kb_dir", return_value=kb_dir): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exception is None + assert [call.args[0].name for call in mock_prepare.call_args_list] == ["unknown.md"] + assert [call.args[0].file_path.name for call in mock_commit.call_args_list] == [ + "known.md", + "unknown.md", + ] + assert mock_commit.call_args_list[0].args[0].outcome == "skipped" + def test_add_unsupported_extension(self, tmp_path): kb_dir = self._setup_kb(tmp_path) doc = tmp_path / "file.xyz" @@ -168,6 +218,45 @@ def test_add_short_doc_runs_compiler(self, tmp_path): assert "path" in meta assert "stale-old-hash" not in hashes + def test_commit_keeps_journal_when_rollback_fails(self, tmp_path): + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + source_path = kb_dir / "wiki" / "sources" / "broken.md" + source_path.write_text("# Broken", encoding="utf-8") + prepared = _PreparedAdd( + file_path=tmp_path / "broken.md", + result=ConvertResult( + raw_path=kb_dir / "raw" / "broken.md", + source_path=source_path, + file_hash="beadfeed00" * 8, + doc_name="broken", + ), + ) + + class FakeSnapshot: + journal_path = kb_dir / ".openkb" / "journal" / "broken.json" + + def __init__(self): + self.discard_called = False + + def rollback_best_effort(self): + return RuntimeError("rollback failed") + + def discard_best_effort(self): + self.discard_called = True + + fake_snapshot = FakeSnapshot() + + with patch("openkb.cli.snapshot_paths", return_value=fake_snapshot), \ + patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "failed" + assert fake_snapshot.discard_called is False + def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path): """Editing a pre-doc_name-era document must not fork the registry. @@ -203,3 +292,407 @@ def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path): new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"] assert len(new_entries) == 1 # …exactly one entry survives assert new_entries[0]["path"] # with path identity persisted + + def test_add_directory_legacy_entry_converges_to_single_entry(self, tmp_path): + import json as json_mod + + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "old-hash", {"name": "notes.md", "type": "md"} + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "notes.md").write_text("# Notes, edited", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + assert "OK" in result.output + + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + assert "old-hash" not in hashes + new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"] + assert len(new_entries) == 1 + assert new_entries[0]["path"] + + def test_add_directory_same_stem_files_get_reserved_names(self, tmp_path): + import json as json_mod + + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + (docs_dir / "a").mkdir(parents=True) + (docs_dir / "b").mkdir(parents=True) + (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8") + (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + assert "Document name conflict" not in result.output + assert result.output.count("[OK]") == 2 + + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + doc_names = {meta["doc_name"] for meta in hashes.values()} + assert len(doc_names) == 2 + assert "report" in doc_names + assert any(name.startswith("report-") for name in doc_names) + + def test_add_directory_same_stem_with_legacy_entry_no_duplicate(self, tmp_path): + """Two same-stem files plus a legacy (path-less) entry must not both + reserve the legacy doc_name. ``find_legacy_by_stem`` must be consumed + (idempotent) across the batch so the second file gets a suffixed name + instead of colliding with the first. + """ + import json as json_mod + + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 2\n", + encoding="utf-8", + ) + # Legacy entry: name + doc_name but NO path → find_legacy_by_stem matches "report". + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "legacy-hash", {"name": "report.md", "doc_name": "report", "type": "md"} + ) + docs_dir = tmp_path / "docs" + (docs_dir / "a").mkdir(parents=True) + (docs_dir / "b").mkdir(parents=True) + (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8") + (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + assert "Document name conflict" not in result.output + assert result.output.count("[OK]") == 2 + + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + report_names = [ + m["doc_name"] for m in hashes.values() if str(m.get("doc_name", "")).startswith("report") + ] + assert len(report_names) == 2 + assert len(set(report_names)) == 2 # no silent overwrite + + def test_commit_rejects_same_filename_different_path_conflict(self, tmp_path): + """A path-indexed entry sharing doc_name + filename but with a + different path (a concurrent add of a same-named file in the + reservation/commit window) must be rejected, not silently + overwritten via the filename escape. + """ + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + source_path = kb_dir / "wiki" / "sources" / "report.md" + source_path.parent.mkdir(parents=True, exist_ok=True) + source_path.write_text("# mine", encoding="utf-8") + + # A DIFFERENT document already owns doc_name "report": different path, + # different hash, same filename. + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "other-hash", + {"name": "report.md", "doc_name": "report", "type": "md", + "path": "elsewhere/report.md"}, + ) + + prepared = _PreparedAdd( + file_path=tmp_path / "report.md", + result=ConvertResult( + raw_path=kb_dir / "raw" / "report.md", + source_path=source_path, + file_hash="myhash" + "0" * 59, + doc_name="report", + ), + ) + + with patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "failed" + hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")) + # The pre-existing document is untouched. + assert "other-hash" in hashes + assert hashes["other-hash"]["path"] == "elsewhere/report.md" + assert "myhash" + "0" * 59 not in hashes + + def test_add_directory_jobs1_stages_each_file(self, tmp_path): + """jobs==1 must stage each file (pass a real staging_dir) instead of + writing the live KB unlocked via staging_dir=None. + """ + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 1\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# A", encoding="utf-8") + (docs_dir / "b.md").write_text("# B", encoding="utf-8") + + from openkb.cli import _PreparedAdd + + seen_staging: list = [] + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + seen_staging.append(staging_dir) + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir) + + runner = CliRunner() + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \ + patch("openkb.cli._commit_prepared_add", return_value="added"), \ + patch("openkb.cli._find_kb_dir", return_value=kb_dir): + runner.invoke(cli, ["add", str(docs_dir)]) + + assert len(seen_staging) == 2 + assert all(s is not None for s in seen_staging) # regression: was None + + def test_commit_returns_added_when_post_commit_cleanup_fails(self, tmp_path): + """Once the registry write lands, a failure in journal cleanup must + NOT roll back the completed ingest (regression: the discard/log used + to live inside the rollback try). + """ + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + source_path = kb_dir / "wiki" / "sources" / "ok.md" + source_path.parent.mkdir(parents=True, exist_ok=True) + source_path.write_text("# OK", encoding="utf-8") + prepared = _PreparedAdd( + file_path=tmp_path / "ok.md", + result=ConvertResult( + raw_path=kb_dir / "raw" / "ok.md", + source_path=source_path, + file_hash="ok" + "0" * 62, + doc_name="ok", + ), + ) + + class FakeSnapshot: + journal_path = kb_dir / ".openkb" / "journal" / "ok.json" + + def mark_committed(self): + pass + + def rollback_best_effort(self): + return None + + def discard_best_effort(self): + return RuntimeError("cleanup failed") + + with patch("openkb.cli.snapshot_paths", return_value=FakeSnapshot()), \ + patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "added" # regression: was "failed" (rolled back success) + hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")) + assert "ok" + "0" * 62 in hashes # registry write survived + + def test_commit_rolls_back_real_snapshot_on_compile_failure(self, tmp_path): + """End-to-end rollback: a REAL snapshot + REAL publish, then a compile + failure, must restore the KB to its pre-add state — published raw and + source files removed, registry unchanged, no orphaned artifacts or + journal. The FakeSnapshot-based test cannot exercise this transactional + guarantee (the whole reason the feature exists). + """ + import json as json_mod + + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + pre_hashes = (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + + # A staging dir holding the converted artifacts that publish_staged_tree + # copies into the live KB before compile runs. + staging = tmp_path / "staging" + (staging / "raw").mkdir(parents=True) + (staging / "wiki" / "sources").mkdir(parents=True) + (staging / "raw" / "boom.md").write_text("# raw", encoding="utf-8") + source_md = staging / "wiki" / "sources" / "boom.md" + source_md.write_text("# converted", encoding="utf-8") + + prepared = _PreparedAdd( + file_path=tmp_path / "boom.md", + result=ConvertResult( + raw_path=staging / "raw" / "boom.md", + source_path=source_md, + file_hash="boom" + "0" * 60, + doc_name="boom", + ), + staging_dir=staging, + ) + + with patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")), \ + patch("openkb.cli.time.sleep"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + assert outcome == "failed" + # Published artifacts were rolled back (removed). + assert not (kb_dir / "raw" / "boom.md").exists() + assert not (kb_dir / "wiki" / "sources" / "boom.md").exists() + assert not (kb_dir / "wiki" / "summaries" / "boom.md").exists() + # Registry restored to pre-add state; no leaked boom entry. + hashes = json_mod.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + assert hashes == json.loads(pre_hashes) + assert "boom" + "0" * 60 not in hashes + # No orphan journal/backup left behind; staging cleaned up. + assert not any((kb_dir / ".openkb" / "journal").glob("*.json")) + assert not staging.exists() + + def test_add_snapshot_rolls_back_pageindex_sqlite_sidecars(self, tmp_path): + """Long-doc failures must not leave SQLite sidecars newer than pageindex.db.""" + from openkb.cli import _snapshot_add_paths + from openkb.mutation import snapshot_paths + + kb_dir = self._setup_kb(tmp_path) + openkb_dir = kb_dir / ".openkb" + (openkb_dir / "pageindex.db").write_bytes(b"before") + + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, "long", None, None), + operation="add", + details={}, + ) + + for suffix in ("-wal", "-shm", "-journal"): + (openkb_dir / f"pageindex.db{suffix}").write_bytes(b"after") + + snapshot.rollback() + snapshot.discard() + + assert (openkb_dir / "pageindex.db").read_bytes() == b"before" + for suffix in ("-wal", "-shm", "-journal"): + assert not (openkb_dir / f"pageindex.db{suffix}").exists() + + def test_add_single_file_stages_unless_file_already_in_raw(self, tmp_path): + """stage=True (default for single-file add / chat) routes convert + through an isolated staging dir; stage=False (watch / URL, file + already in raw/) keeps convert's in-place path. The staging default + closes the crash-orphan window for files that don't already live in + raw/.""" + from openkb.cli import _PreparedAdd, _add_single_file_locked + + kb_dir = self._setup_kb(tmp_path) + doc = tmp_path / "test.md" + doc.write_text("# hi", encoding="utf-8") + + captured: list = [] + + def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None): + captured.append(staging_dir) + return _PreparedAdd(file_path=file_path, staging_dir=staging_dir, outcome="skipped") + + with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \ + patch("openkb.cli._commit_prepared_add", return_value="skipped"): + _add_single_file_locked(doc, kb_dir) # default stage=True + _add_single_file_locked(doc, kb_dir, stage=False) + + assert captured[0] is not None # staged by default → no live-KB write pre-snapshot + assert captured[1] is None # in-place for watch/URL (file already in raw/) + + def test_commit_conflict_guard_normalizes_unicode_filenames(self, tmp_path): + """A legacy (path-less) entry whose name is stored NFC must match a + file whose name the filesystem reports as NFD (macOS HFS+/APFS), so a + same-document re-add is allowed instead of mis-reported as a conflict. + The guard NFKC-normalizes both sides; a raw ``==`` would diverge.""" + from openkb.cli import _PreparedAdd, _commit_prepared_add + from openkb.converter import ConvertResult + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + import unicodedata as _ud + nfc_name = "r\u00e9sum\u00e9.pdf" # NFC: é = U+00E9 (composed) + nfd_name = _ud.normalize("NFD", nfc_name) # NFD: e + U+0301 (decomposed) + assert nfc_name != nfd_name # raw bytes differ + + source_path = kb_dir / "wiki" / "sources" / "resume.md" + source_path.parent.mkdir(parents=True, exist_ok=True) + source_path.write_text("# cv", encoding="utf-8") + + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "legacy-hash", {"name": nfc_name, "doc_name": "resume", "type": "pdf"} + ) + + prepared = _PreparedAdd( + file_path=tmp_path / nfd_name, + result=ConvertResult( + raw_path=kb_dir / "raw" / "resume.pdf", + source_path=source_path, + file_hash="new" + "0" * 61, + doc_name="resume", + ), + ) + + with patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"): + outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini") + + # NFC-vs-NFD is the same document, not a conflict → ingest proceeds. + assert outcome == "added" + + def test_add_directory_jobs_gt1_runs_real_pipeline(self, tmp_path): + """jobs>1 ThreadPoolExecutor 路径的端到端测试。 + + 其余 jobs>1 测试都 mock 了 _prepare_add_file 和 _commit_prepared_add, + 所以真正的并发分支——futures 按扫描顺序提交、_staging_dir_for 分配、 + prepared_outcomes.get(f) or futures[f].result() 回退、publish_staged_tree + 发布、registry 写入、staging 清理——从不被执行。这里用真实 prepare + 真实 + commit,只 mock LLM compile,让最复杂的新路径真正跑一遍。 + """ + kb_dir = self._setup_kb(tmp_path) + (kb_dir / ".openkb" / "config.yaml").write_text( + "model: gpt-4o-mini\nfile_processing_jobs: 3\n", + encoding="utf-8", + ) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + for letter in ("a", "b", "c"): + (docs_dir / f"{letter}.md").write_text(f"# {letter}", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ + patch("openkb.cli.asyncio.run"): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exception is None, result.output + assert result.output.count("[OK]") == 3 + hashes = json.loads( + (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8") + ) + assert len(hashes) == 3 + assert {meta["doc_name"] for meta in hashes.values()} == {"a", "b", "c"} + # Staging dirs cleaned up after each commit. + staging = kb_dir / ".openkb" / "staging" + if staging.exists(): + assert not any(p.name.startswith("add-") for p in staging.iterdir()) + # Source artifacts published from staging into the live KB. + for letter in ("a", "b", "c"): + assert (kb_dir / "wiki" / "sources" / f"{letter}.md").exists() diff --git a/tests/test_mutation.py b/tests/test_mutation.py new file mode 100644 index 00000000..1169c967 --- /dev/null +++ b/tests/test_mutation.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import pytest + +from openkb.mutation import recover_pending_journals, snapshot_paths + + +def test_recover_pending_add_journal_rolls_back_files(tmp_path): + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + new_file = kb_dir / "wiki" / "sources" / "doc.md" + + snapshot_paths( + kb_dir, + [target, new_file], + operation="add", + details={"doc_name": "doc"}, + ) + target.write_text("after", encoding="utf-8") + new_file.parent.mkdir(parents=True) + new_file.write_text("new", encoding="utf-8") + + messages = recover_pending_journals(kb_dir) + + assert any("Rolled back interrupted add journal" in message for message in messages) + assert target.read_text(encoding="utf-8") == "before" + assert not new_file.exists() + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_mark_committed_prevents_recovery_rollback(tmp_path): + """A snapshot marked committed must be discarded (not rolled back) by + recovery — the commit signal that protects a completed mutation from + being undone when post-commit cleanup fails. + """ + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [target], operation="add", details={"doc_name": "doc"} + ) + target.write_text("after", encoding="utf-8") # the "committed" mutation + snapshot.mark_committed() + + messages = recover_pending_journals(kb_dir) + + assert any("Cleaned terminal mutation journal" in m for m in messages) + assert target.read_text(encoding="utf-8") == "after" # NOT rolled back + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path): + """A partially-created snapshot must not leak its backup dir: on any + failure before the journal is written, snapshot_paths removes the + rollback dir it created (recover_pending_journals only scans journals + and could never reach it otherwise). + """ + kb_dir = tmp_path / "kb" + kb_dir.mkdir() + # A target that resolves OUTSIDE kb_dir makes relative_to(kb_dir) raise + # mid-loop, after backup_dir was already mkdir'd. + outside = tmp_path / "outside.txt" + outside.write_text("hi", encoding="utf-8") + + with pytest.raises(ValueError): + snapshot_paths(kb_dir, [outside], operation="add", details={}) + + staging = kb_dir / ".openkb" / "staging" + if staging.exists(): + assert not any(staging.iterdir()) # no orphan rollback- dir + + +def test_exclusive_lock_drains_active_journal_before_yielding(tmp_path): + """Recovery runs on every exclusive-lock acquisition, not just the add path. + + ``recover_pending_journals`` is wired into ``kb_lock``'s first exclusive + acquisition, so any mutation command — ``remove``/``recompile``/``lint``/ + ``chat``, all of which take ``kb_ingest_lock`` directly — drains a crashed + predecessor's active journal before it mutates. This is the regression + guard for the bug where an ``add`` crash left an active journal that an + intervening ``remove`` ignored and a later ``add`` then rolled back over + the remove's edits. + """ + from openkb.locks import kb_ingest_lock + + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + # Simulate a crashed add: snapshot taken, file mutated, but mark_committed + # never ran — an ACTIVE journal is left on disk. + snapshot_paths(kb_dir, [target], operation="add", details={"doc_name": "doc"}) + target.write_text("after", encoding="utf-8") + + # Any exclusive-lock holder drains before its body runs. + with kb_ingest_lock(openkb_dir): + assert target.read_text(encoding="utf-8") == "before" + + assert target.read_text(encoding="utf-8") == "before" + assert not any((openkb_dir / "journal").glob("*.json"))