diff --git a/README.md b/README.md
index 100c9fa6..d797cd33 100644
--- a/README.md
+++ b/README.md
@@ -333,6 +333,7 @@ OpenKB settings are initialized by `openkb init` and stored in `.openkb/config.y
model: gpt-5.4 # LLM model (any LiteLLM-supported provider)
language: en # Wiki output language
pageindex_threshold: 20 # PDF pages threshold for PageIndex
+file_processing_jobs: 2 # Files to prepare concurrently during `openkb add
`
```
Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/providers) (OpenAI models can omit the prefix):
@@ -347,6 +348,8 @@ Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/p
Advanced options (entity_types, extra_headers, OAuth):
+`file_processing_jobs` (default `2`): number of files prepared concurrently during `openkb add `. Only the preparation stage is parallelized (hashing, duplicate prefiltering, raw/source staging, conversion); live-KB mutation stays serialized under the mutation lock, so raising it helps mainly when conversion is the bottleneck.
+
`entity_types` (optional): a YAML list overriding the entity-type vocabulary used for entity pages; omit it to use the default `person`, `organization`, `place`, `product`, `work`, `event`, `other`.
`extra_headers` (optional): a YAML mapping of extra HTTP headers sent with every LLM request (forwarded to LiteLLM's `extra_headers`). Useful for providers that expect custom headers, e.g. GitHub Copilot IDE-auth headers:
diff --git a/config.yaml.example b/config.yaml.example
index 45b4b8c3..aff75f0a 100644
--- a/config.yaml.example
+++ b/config.yaml.example
@@ -1,6 +1,10 @@
model: gpt-5.4 # LLM model (any LiteLLM-supported provider)
language: en # Wiki output language
pageindex_threshold: 20 # PDF pages threshold for PageIndex
+file_processing_jobs: 2 # Number of files to prepare concurrently during `openkb add `
+# Note: this parallelizes hashing/conversion/staging only. Live KB publish,
+# PageIndex indexing, LLM compilation, registry updates, and log writes remain
+# serialized under the KB mutation lock.
# Optional: extra HTTP headers sent with every LLM request (forwarded to
# LiteLLM's extra_headers). Some providers need these — e.g. GitHub Copilot
diff --git a/openkb/cli.py b/openkb/cli.py
index b48ebc17..7abbfe2f 100644
--- a/openkb/cli.py
+++ b/openkb/cli.py
@@ -8,11 +8,16 @@
warnings.filterwarnings("ignore")
import asyncio
+from concurrent.futures import ThreadPoolExecutor
+from contextlib import contextmanager
+from dataclasses import dataclass
import json
import logging
import shutil
import sys
import time
+import unicodedata
+import uuid
from functools import wraps
from pathlib import Path
from typing import Literal
@@ -45,9 +50,10 @@ def filter(self, record: logging.LogRecord) -> bool:
DEFAULT_CONFIG, load_config, save_config, load_global_config, register_kb,
resolve_extra_headers, set_extra_headers,
)
-from openkb.converter import _registry_path, convert_document
+from openkb.converter import _registry_path, _sanitize_stem, convert_document, resolve_doc_name
from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock
from openkb.log import append_log
+from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths
from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS
# Suppress warnings after all imports — markitdown overrides filters at import time
@@ -67,6 +73,7 @@ def filter(self, record: logging.LogRecord) -> bool:
# handled by LiteLLM itself) — no API key env var is needed, so the
# missing-key warning would be a false alarm for them.
_OAUTH_PROVIDERS = {"chatgpt", "github_copilot"}
+_AddOutcome = Literal["added", "skipped", "failed"]
def _extract_provider(model: str) -> str | None:
@@ -274,13 +281,468 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None:
shutil.rmtree(target)
-def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]:
- """Convert, index, and compile a single document under the KB mutation lock."""
+@dataclass
+class _PreparedAdd:
+ file_path: Path
+ result: object | None = None
+ staging_dir: Path | None = None
+ outcome: _AddOutcome | None = None
+ error_stage: str = ""
+ error: Exception | None = None
+
+
+def _positive_int(value: object, default: int) -> int:
+ try:
+ parsed = int(value)
+ except (TypeError, ValueError):
+ return default
+ return max(1, parsed)
+
+
+def _log_add_timing(
+ stage: str,
+ file_path: Path | None,
+ started_at: float,
+ **fields: object,
+) -> None:
+ logger = logging.getLogger(__name__)
+ if not logger.isEnabledFor(logging.DEBUG):
+ return
+ elapsed = time.perf_counter() - started_at
+ subject = file_path.name if file_path is not None else ""
+ suffix = "".join(f" {key}={value}" for key, value in fields.items())
+ logger.debug("add %s for %s took %.3fs%s", stage, subject, elapsed, suffix)
+
+
+def _prefilter_known_files(
+ files: list[Path],
+ kb_dir: Path,
+ jobs: int,
+) -> tuple[list[Path], dict[Path, _PreparedAdd]]:
+ """Hash directory inputs before conversion and skip hashes already known."""
+ from openkb.state import HashRegistry
+
+ started = time.perf_counter()
+ with _kb_mutation_lock(kb_dir):
+ registry = HashRegistry.memory(
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries()
+ )
+
+ def hash_one(file_path: Path) -> tuple[Path, str | None, Exception | None]:
+ try:
+ return file_path, HashRegistry.hash_file(file_path), None
+ except Exception as exc:
+ return file_path, None, exc
+
+ if jobs == 1:
+ hashed = [hash_one(file_path) for file_path in files]
+ else:
+ with ThreadPoolExecutor(max_workers=jobs) as executor:
+ futures = [executor.submit(hash_one, file_path) for file_path in files]
+ hashed = [future.result() for future in futures]
+
+ remaining: list[Path] = []
+ prepared: dict[Path, _PreparedAdd] = {}
+ for file_path, file_hash, error in hashed:
+ if error is not None:
+ prepared[file_path] = _PreparedAdd(
+ file_path=file_path,
+ outcome="failed",
+ error_stage="Hash",
+ error=error,
+ )
+ continue
+ if file_hash is not None and registry.is_known(file_hash):
+ prepared[file_path] = _PreparedAdd(
+ file_path=file_path,
+ outcome="skipped",
+ )
+ continue
+ remaining.append(file_path)
+
+ _log_add_timing(
+ "prefilter",
+ None,
+ started,
+ total=len(files),
+ remaining=len(remaining),
+ skipped=len(prepared),
+ jobs=jobs,
+ )
+ return remaining, prepared
+
+
+def _reserve_batch_doc_names(files: list[Path], kb_dir: Path) -> dict[Path, str]:
+ """Reserve doc_names in scan order so parallel prepare matches serial add."""
+ from openkb.state import HashRegistry
+
+ # In-memory registry view: same resolve contract as the on-disk registry
+ # but add() never persists. persist_legacy=True below therefore only
+ # *consumes* a matched legacy entry in memory (backfilling its path) so a
+ # later same-stem file in this batch no longer re-matches it via
+ # find_legacy_by_stem — the idempotency fix. With a persisting registry
+ # that same call would write disk on every reservation.
+ batch_registry = HashRegistry.memory(
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries()
+ )
+ reserved: dict[Path, str] = {}
+ for file_path in files:
+ doc_name = resolve_doc_name(
+ file_path,
+ kb_dir,
+ batch_registry,
+ persist_legacy=True,
+ )
+ reserved[file_path] = doc_name
+ # Future files in the same batch must see this name as already taken;
+ # otherwise same-stem files prepared in parallel could all claim it.
+ batch_registry.add(
+ f"batch:{len(reserved)}:{_registry_path(file_path, kb_dir)}",
+ {
+ "name": file_path.name,
+ "doc_name": doc_name,
+ "path": _registry_path(file_path, kb_dir),
+ },
+ )
+ return reserved
+
+
+def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path:
+ safe = _sanitize_stem(file_path.stem)
+ # uuid (not time.time_ns()) for uniqueness: two same-stem files sanitize to
+ # the same `safe`, so the timestamp was the only differentiator and a
+ # repeated wall-clock ns would make mkdir(exist_ok=False) crash the batch.
+ path = kb_dir / ".openkb" / "staging" / f"add-{safe}-{uuid.uuid4().hex[:8]}"
+ path.mkdir(parents=True, exist_ok=False)
+ return path
+
+
+def _cleanup_staging(path: Path | None) -> None:
+ if path is not None:
+ shutil.rmtree(path, ignore_errors=True)
+
+
+def _prepare_add_file(
+ file_path: Path,
+ kb_dir: Path,
+ staging_dir: Path | None,
+ doc_name: str | None = None,
+) -> _PreparedAdd:
+ """Run file-local conversion into an optional staging directory."""
+ logger = logging.getLogger(__name__)
+ started = time.perf_counter()
+ try:
+ result = convert_document(
+ file_path,
+ kb_dir,
+ assume_locked=True,
+ staging_dir=staging_dir,
+ doc_name_override=doc_name,
+ )
+ except Exception as exc:
+ logger.debug("Conversion traceback:", exc_info=True)
+ _log_add_timing("prepare", file_path, started, outcome="failed")
+ return _PreparedAdd(
+ file_path=file_path,
+ staging_dir=staging_dir,
+ outcome="failed",
+ error_stage="Conversion",
+ error=exc,
+ )
+ if result.skipped:
+ prepared = _PreparedAdd(
+ file_path=file_path,
+ result=result,
+ staging_dir=staging_dir,
+ outcome="skipped",
+ )
+ _log_add_timing("prepare", file_path, started, outcome="skipped")
+ return prepared
+ prepared = _PreparedAdd(file_path=file_path, result=result, staging_dir=staging_dir)
+ _log_add_timing("prepare", file_path, started, outcome="prepared")
+ return prepared
+
+
+def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]:
+ final_raw = None
+ final_source = None
+ if result.raw_path is not None:
+ final_raw = kb_dir / "raw" / result.raw_path.name
+ if result.source_path is not None:
+ final_source = kb_dir / "wiki" / "sources" / result.source_path.name
+ return final_raw, final_source
+
+
+def _snapshot_add_paths(kb_dir: Path, doc_name: str, final_raw: Path | None, final_source: Path | None) -> list[Path]:
+ paths = [
+ kb_dir / ".openkb" / "hashes.json",
+ kb_dir / ".openkb" / "pageindex.db",
+ # SQLite may keep recently-written PageIndex state in sidecar files.
+ # Snapshot missing paths too, so rollback removes sidecars created
+ # during a failed long-document ingest.
+ kb_dir / ".openkb" / "pageindex.db-wal",
+ kb_dir / ".openkb" / "pageindex.db-shm",
+ kb_dir / ".openkb" / "pageindex.db-journal",
+ kb_dir / ".openkb" / "files",
+ kb_dir / "wiki" / "summaries" / f"{doc_name}.md",
+ kb_dir / "wiki" / "sources" / f"{doc_name}.json",
+ kb_dir / "wiki" / "sources" / "images" / doc_name,
+ kb_dir / "wiki" / "concepts",
+ kb_dir / "wiki" / "entities",
+ kb_dir / "wiki" / "index.md",
+ kb_dir / "wiki" / "log.md",
+ ]
+ if final_raw is not None:
+ paths.append(final_raw)
+ if final_source is not None:
+ paths.append(final_source)
+ return paths
+
+
+def _run_compile_with_retry(coro_factory, label: str) -> None:
+ """Run an async compile coroutine once, retrying once after 2s on failure.
+
+ ``coro_factory`` builds a fresh coroutine each call (an ``asyncio.run``-consumed
+ coroutine can't be awaited twice). Raises on the second failure so the caller's
+ snapshot-rollback path runs.
+ """
+ logger = logging.getLogger(__name__)
+ click.echo(f" {label}...")
+ started = time.perf_counter()
+ for attempt in range(2):
+ try:
+ asyncio.run(coro_factory())
+ _log_add_timing("compile", None, started, label=label)
+ return
+ except Exception as exc:
+ if attempt == 0:
+ click.echo(" Retrying compilation in 2s...")
+ time.sleep(2)
+ else:
+ click.echo(f" [ERROR] Compilation failed: {exc}")
+ logger.debug("Compilation traceback:", exc_info=True)
+ raise
+
+
+def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _AddOutcome:
+ """Commit a prepared add while the caller holds the KB mutation lock."""
+ from openkb.agent.compiler import compile_long_doc, compile_short_doc
+ from openkb.state import HashRegistry
+
+ logger = logging.getLogger(__name__)
+ file_path = prepared.file_path
+ started = time.perf_counter()
+
+ def finish(outcome: _AddOutcome) -> _AddOutcome:
+ _log_add_timing("commit", file_path, started, outcome=outcome)
+ return outcome
+
+ if prepared.outcome == "failed":
+ click.echo(f" [ERROR] {prepared.error_stage} failed: {prepared.error}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("failed")
+ if prepared.outcome == "skipped":
+ click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("skipped")
+ if prepared.result is None:
+ click.echo(f" [ERROR] Conversion failed: no result for {file_path.name}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("failed")
+
+ result = prepared.result
+ registry = HashRegistry(kb_dir / ".openkb" / "hashes.json")
+ if result.file_hash and registry.is_known(result.file_hash):
+ click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("skipped")
+
+ doc_name = result.doc_name or file_path.stem
+ norm_doc_name = unicodedata.normalize("NFKC", doc_name)
+ norm_file_name = unicodedata.normalize("NFKC", file_path.name)
+ path_key = _registry_path(file_path, kb_dir)
+ for existing_hash, meta in registry.all_entries().items():
+ existing_name = meta.get("doc_name") or Path(meta.get("name", "")).stem
+ if (
+ existing_hash != result.file_hash
+ and unicodedata.normalize("NFKC", existing_name) == norm_doc_name
+ ):
+ existing_path = meta.get("path")
+ # Same document iff it shares our path, OR it is a legacy
+ # (pre-path-index) entry with no path whose filename matches — the
+ # pre-collision form of this same document being re-ingested. A
+ # path-indexed entry must NOT be matched by filename alone: two
+ # different files can share a name, and a concurrent add could
+ # claim this doc_name in the window between reservation and commit.
+ # NFKC-normalize names: macOS reports filenames in NFD, so a raw
+ # `==` would mis-classify a same-document re-add as a conflict.
+ if existing_path == path_key or (
+ not existing_path
+ and unicodedata.normalize("NFKC", meta.get("name") or "") == norm_file_name
+ ):
+ continue
+ click.echo(
+ " [ERROR] Document name conflict after parallel preparation: "
+ f"'{doc_name}' is already used by {meta.get('name', 'another document')}. "
+ "Re-run this file after the current batch."
+ )
+ _cleanup_staging(prepared.staging_dir)
+ return finish("failed")
+
+ final_raw, final_source = _final_artifact_paths(result, kb_dir)
+ snapshot: MutationSnapshot | None = None
+ index_result = None
+
+ try:
+ # Take the snapshot inside the try: a failure here (disk-full copytree
+ # of concepts/entities, permission, relative_to ValueError) used to
+ # propagate uncaught — aborting the whole remaining batch and leaking
+ # an unjournaled backup dir. snapshot_paths now self-cleans its backup
+ # dir on partial failure, so when snapshot stays None there is nothing
+ # to roll back and the except branch handles that case.
+ snapshot = snapshot_paths(
+ kb_dir,
+ _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source),
+ operation="add",
+ details={"file_hash": result.file_hash, "name": file_path.name, "doc_name": doc_name},
+ )
+ publish_staged_tree(prepared.staging_dir, kb_dir)
+ if final_raw is not None:
+ result.raw_path = final_raw
+ if final_source is not None:
+ result.source_path = final_source
+
+ if result.is_long_doc:
+ click.echo(" Long document detected — indexing with PageIndex...")
+ index_started = time.perf_counter()
+ try:
+ from openkb.indexer import index_long_document
+
+ index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name)
+ _log_add_timing("index", file_path, index_started, doc_name=doc_name)
+ except Exception as exc:
+ click.echo(f" [ERROR] Indexing failed: {exc}")
+ logger.debug("Indexing traceback:", exc_info=True)
+ raise
+
+ summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md"
+ _run_compile_with_retry(
+ lambda: compile_long_doc(
+ doc_name,
+ summary_path,
+ index_result.doc_id,
+ kb_dir,
+ model,
+ doc_description=index_result.description,
+ ),
+ label=f"Compiling long doc (doc_id={index_result.doc_id})",
+ )
+ else:
+ _run_compile_with_retry(
+ lambda: compile_short_doc(doc_name, result.source_path, kb_dir, model),
+ label="Compiling short doc",
+ )
+
+ if result.file_hash:
+ # Reuse the conflict-scan registry (line above): still valid under
+ # the same lock — nothing mutated it between scan and write.
+ doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".")
+ meta = {
+ "name": file_path.name,
+ "doc_name": doc_name,
+ "type": doc_type,
+ "path": _registry_path(file_path, kb_dir),
+ }
+ if result.raw_path is not None:
+ meta["raw_path"] = _registry_path(result.raw_path, kb_dir)
+ if result.source_path is not None:
+ meta["source_path"] = _registry_path(result.source_path, kb_dir)
+ if index_result is not None:
+ meta["doc_id"] = index_result.doc_id
+ registry.remove_by_doc_name(doc_name)
+ for existing_hash, existing_meta in list(registry.all_entries().items()):
+ if (
+ existing_hash != result.file_hash
+ and not existing_meta.get("doc_name")
+ and existing_meta.get("name") == file_path.name
+ ):
+ registry.remove_by_hash(existing_hash)
+ registry.add(result.file_hash, meta)
+
+ # Commit point: the registry write is durable. Mark the journal
+ # committed so that any failure in the post-commit cleanup below
+ # (log append, backup removal) cannot trigger a recovery rollback
+ # that discards this completed ingest. mark_committed is the last
+ # step inside the try — together with registry.add it forms the
+ # atomic commit group, so a failure here correctly rolls back.
+ snapshot.mark_committed()
+ except Exception:
+ if snapshot is None:
+ # snapshot_paths failed before any mutation ran; it already removed
+ # its own backup dir, so there is nothing to roll back.
+ click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.")
+ return finish("failed")
+ rollback_error = snapshot.rollback_best_effort()
+ if rollback_error is None:
+ snapshot.discard_best_effort()
+ else:
+ click.echo(
+ " [ERROR] Rollback failed; mutation journal retained for recovery: "
+ f"{snapshot.journal_path}"
+ )
+ return finish("failed")
+ finally:
+ _cleanup_staging(prepared.staging_dir)
+
+ # Post-commit side effects. These run only after the mutation is
+ # committed and the journal marked committed, so a failure here must NOT
+ # roll back — best-effort only. A stale "committed" journal left behind
+ # is harmless: the next recover_pending_journals sees status "committed"
+ # and discards it.
+ try:
+ append_log(kb_dir / "wiki", "ingest", file_path.name)
+ except Exception as exc:
+ logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc)
+ cleanup_error = snapshot.discard_best_effort()
+ if cleanup_error is not None:
+ click.echo(
+ f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}"
+ )
+ click.echo(f" [OK] {file_path.name} added to knowledge base.")
+ return finish("added")
+
+
+@contextmanager
+def _kb_mutation_lock(kb_dir: Path):
+ """Acquire the ingest lock for an add-path mutation.
+
+ Journal draining lives in :func:`openkb.locks.kb_lock` now, so every
+ exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``,
+ ``chat`` — drains pending journals on first acquisition, not just the add
+ path. This wrapper remains the add pipeline's entry point (prepare/commit
+ run inside the lock); it no longer drains separately, since doing so would
+ double-scan and double-log on every per-file commit in a directory batch.
+ """
+ started = time.perf_counter()
with kb_ingest_lock(kb_dir / ".openkb"):
- return _add_single_file_locked(file_path, kb_dir)
+ _log_add_timing("lock_wait", None, started)
+ yield
+
+def add_single_file(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome:
+ """Convert, index, and compile a single document under the KB mutation lock.
+
+ ``stage=True`` converts into an isolated staging dir before the commit
+ snapshot, so a crash between convert and commit can't orphan raw/source
+ files in the live KB. Callers whose file already lives in ``raw/`` (watch
+ mode, URL fetch) pass ``stage=False`` to keep convert's in-place
+ optimization.
+ """
+ with _kb_mutation_lock(kb_dir):
+ return _add_single_file_locked(file_path, kb_dir, stage=stage)
-def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]:
+
+def _add_single_file_locked(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome:
"""Convert, index, and compile a single document into the knowledge base.
Steps:
@@ -297,106 +759,19 @@ def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "
be an orphan) while preserving it on failure so the user can
retry without re-downloading.
"""
- from openkb.agent.compiler import compile_long_doc, compile_short_doc
- from openkb.state import HashRegistry
-
- logger = logging.getLogger(__name__)
openkb_dir = kb_dir / ".openkb"
config = load_config(openkb_dir / "config.yaml")
_setup_llm_key(kb_dir)
model: str = config.get("model", DEFAULT_CONFIG["model"])
- # 2. Convert document
click.echo(f"Adding: {file_path.name}")
- try:
- result = convert_document(file_path, kb_dir)
- except Exception as exc:
- click.echo(f" [ERROR] Conversion failed: {exc}")
- logger.debug("Conversion traceback:", exc_info=True)
- return "failed"
-
- if result.skipped:
- click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
- return "skipped"
-
- doc_name = result.doc_name or file_path.stem
- index_result = None # populated only on the long-doc branch
-
- # 3/4. Index and compile
- if result.is_long_doc:
- click.echo(f" Long document detected — indexing with PageIndex...")
- try:
- from openkb.indexer import index_long_document
- index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name)
- except Exception as exc:
- click.echo(f" [ERROR] Indexing failed: {exc}")
- logger.debug("Indexing traceback:", exc_info=True)
- return "failed"
-
- summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md"
- click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...")
- for attempt in range(2):
- try:
- asyncio.run(
- compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model,
- doc_description=index_result.description)
- )
- break
- except Exception as exc:
- if attempt == 0:
- click.echo(f" Retrying compilation in 2s...")
- time.sleep(2)
- else:
- click.echo(f" [ERROR] Compilation failed: {exc}")
- logger.debug("Compilation traceback:", exc_info=True)
- return "failed"
- else:
- click.echo(f" Compiling short doc...")
- for attempt in range(2):
- try:
- asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model))
- break
- except Exception as exc:
- if attempt == 0:
- click.echo(f" Retrying compilation in 2s...")
- time.sleep(2)
- else:
- click.echo(f" [ERROR] Compilation failed: {exc}")
- logger.debug("Compilation traceback:", exc_info=True)
- return "failed"
-
- # Register hash only after successful compilation
- if result.file_hash:
- # Construct the registry NOW, not earlier: convert_document may have
- # backfilled a legacy entry (doc_name/path) on disk via its own
- # instance, and an earlier snapshot would clobber that backfill on
- # the full rewrite in add().
- registry = HashRegistry(openkb_dir / "hashes.json")
- doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".")
- meta = {
- "name": file_path.name,
- "doc_name": doc_name,
- "type": doc_type,
- "path": _registry_path(file_path, kb_dir),
- }
- if result.raw_path is not None:
- meta["raw_path"] = _registry_path(result.raw_path, kb_dir)
- if result.source_path is not None:
- meta["source_path"] = _registry_path(result.source_path, kb_dir)
- # For long PDFs we also persist the PageIndex doc_id so `openkb
- # remove` can later call ``Collection.delete_document(doc_id)``
- # to free the managed PDF copy + SQLite row.
- if index_result is not None:
- meta["doc_id"] = index_result.doc_id
- # An edited document arrives with a new content hash; drop the
- # stale entry for the same doc_name so the registry keeps exactly
- # one entry per document.
- registry.remove_by_doc_name(doc_name)
- registry.add(result.file_hash, meta)
-
- append_log(kb_dir / "wiki", "ingest", file_path.name)
- click.echo(f" [OK] {file_path.name} added to knowledge base.")
- return "added"
+ # Stage unless the file already lives in raw/ (watch/URL), where convert's
+ # in-place optimization applies. Staging keeps convert's writes out of the
+ # live KB so a crash between convert and the commit snapshot can't orphan
+ # raw/source files with no journal to recover them.
+ staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None
+ prepared = _prepare_add_file(file_path, kb_dir, staging_dir=staging_dir)
+ return _commit_prepared_add(prepared, kb_dir, model)
# ---------------------------------------------------------------------------
@@ -602,6 +977,7 @@ def init(model, language):
"model": model,
"language": language,
"pageindex_threshold": DEFAULT_CONFIG["pageindex_threshold"],
+ "file_processing_jobs": DEFAULT_CONFIG["file_processing_jobs"],
}
save_config(openkb_dir / "config.yaml", config)
atomic_write_json(openkb_dir / "hashes.json", {})
@@ -625,7 +1001,6 @@ def init(model, language):
@cli.command()
@click.argument("path")
@click.pass_context
-@_with_kb_lock(exclusive=True)
def add(ctx, path):
"""Add a document or directory of documents at PATH to the knowledge base.
@@ -647,10 +1022,11 @@ def add(ctx, path):
# that the registry can't reach via openkb remove.
from openkb.url_ingest import looks_like_url, fetch_url_to_raw
if looks_like_url(path):
- fetched = fetch_url_to_raw(path, kb_dir)
- if fetched is None:
- return
- outcome = add_single_file(fetched, kb_dir)
+ with _kb_mutation_lock(kb_dir):
+ fetched = fetch_url_to_raw(path, kb_dir)
+ if fetched is None:
+ return
+ outcome = _add_single_file_locked(fetched, kb_dir, stage=False)
# Only clean up on dedup-skip. On "failed" we keep the file so
# the user can retry (e.g. transient LLM error during compile)
# without re-downloading — and so they don't lose data when
@@ -674,9 +1050,61 @@ def add(ctx, path):
return
total = len(files)
click.echo(f"Found {total} supported file(s) in {path}.")
- for i, f in enumerate(files, 1):
- click.echo(f"\n[{i}/{total}] ", nl=False)
- add_single_file(f, kb_dir)
+ config = load_config(kb_dir / ".openkb" / "config.yaml")
+ jobs = min(
+ total,
+ _positive_int(
+ config.get("file_processing_jobs"),
+ DEFAULT_CONFIG["file_processing_jobs"],
+ ),
+ )
+ _setup_llm_key(kb_dir)
+ model = config.get("model", DEFAULT_CONFIG["model"])
+ files_to_prepare, prepared_outcomes = _prefilter_known_files(files, kb_dir, jobs)
+ with _kb_mutation_lock(kb_dir):
+ reserved_doc_names = _reserve_batch_doc_names(files_to_prepare, kb_dir)
+ if jobs == 1:
+ for i, f in enumerate(files, 1):
+ if f in prepared_outcomes:
+ prepared = prepared_outcomes[f]
+ click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}")
+ else:
+ click.echo(f"\n[{i}/{total}] ", nl=False)
+ # Stage exactly like the multi-worker path: prepare must never
+ # write the live KB unlocked. staging_dir=None would make
+ # convert_document write raw/ and wiki/sources/ straight into
+ # kb_dir with no lock and no mutation journal (orphan on crash).
+ prepared = _prepare_add_file(
+ f,
+ kb_dir,
+ staging_dir=_staging_dir_for(kb_dir, f),
+ doc_name=reserved_doc_names[f],
+ )
+ with _kb_mutation_lock(kb_dir):
+ _commit_prepared_add(prepared, kb_dir, model)
+ return
+
+ click.echo(f"Preparing files with {jobs} worker(s).")
+ with ThreadPoolExecutor(max_workers=jobs) as executor:
+ futures = {
+ f: executor.submit(
+ _prepare_add_file,
+ f,
+ kb_dir,
+ _staging_dir_for(kb_dir, f),
+ reserved_doc_names[f],
+ )
+ for f in files_to_prepare
+ }
+ # Commit in scan order even though prepare finishes out of order.
+ # That keeps log.md and CLI progress stable for human audit.
+ for i, f in enumerate(files, 1):
+ prepared = prepared_outcomes.get(f)
+ if prepared is None:
+ prepared = futures[f].result()
+ click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}")
+ with _kb_mutation_lock(kb_dir):
+ _commit_prepared_add(prepared, kb_dir, model)
else:
if target.suffix.lower() not in SUPPORTED_EXTENSIONS:
click.echo(
@@ -1412,7 +1840,7 @@ def on_new_files(paths):
f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}"
)
continue
- add_single_file(fp, kb_dir)
+ add_single_file(fp, kb_dir, stage=False)
click.echo(f"Watching {raw_dir} for new documents. Press Ctrl+C to stop.")
watch_directory(raw_dir, on_new_files)
diff --git a/openkb/config.py b/openkb/config.py
index 9d0d6cd4..4c44addd 100644
--- a/openkb/config.py
+++ b/openkb/config.py
@@ -16,6 +16,7 @@
"model": "gpt-5.4-mini",
"language": "en",
"pageindex_threshold": 20,
+ "file_processing_jobs": 2,
}
# Default entity-type vocabulary. Overridable per-KB via the optional
diff --git a/openkb/converter.py b/openkb/converter.py
index 9ebac762..b5c2add6 100644
--- a/openkb/converter.py
+++ b/openkb/converter.py
@@ -14,6 +14,7 @@
from openkb.config import load_config
from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images
+from openkb.locks import atomic_write_text, kb_ingest_lock
from openkb.state import HashRegistry
logger = logging.getLogger(__name__)
@@ -29,6 +30,7 @@ class ConvertResult:
skipped: bool = False
file_hash: str | None = None # For deferred hash registration
doc_name: str | None = None # Stable wiki name (collision-resistant)
+ staging_dir: Path | None = None
def _registry_path(path: Path, kb_dir: Path) -> str:
@@ -70,7 +72,13 @@ def _name_taken(candidate: str, registry: HashRegistry) -> bool:
return False
-def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str:
+def resolve_doc_name(
+ src: Path,
+ kb_dir: Path,
+ registry: HashRegistry,
+ *,
+ persist_legacy: bool = True,
+) -> str:
"""Resolve the stable wiki name for ``src`` (Scheme A).
Identity is keyed by path: a source we've seen before (same path, even
@@ -93,9 +101,10 @@ def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str:
file_hash, meta = legacy
meta = dict(meta)
name = meta.get("doc_name") or Path(meta.get("name", "")).stem
- meta["doc_name"] = name
- meta["path"] = path_key
- registry.add(file_hash, meta) # backfill + persist
+ if persist_legacy:
+ meta["doc_name"] = name
+ meta["path"] = path_key
+ registry.add(file_hash, meta) # backfill + persist
return name
candidate = _sanitize_stem(src.stem)
@@ -111,7 +120,14 @@ def get_pdf_page_count(path: Path) -> int:
return doc.page_count
-def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
+def convert_document(
+ src: Path,
+ kb_dir: Path,
+ *,
+ assume_locked: bool = False,
+ staging_dir: Path | None = None,
+ doc_name_override: str | None = None,
+) -> ConvertResult:
"""Convert a document and integrate it into the knowledge base.
Steps:
@@ -122,12 +138,23 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``.
6. Register hash in the registry.
"""
+ if not assume_locked:
+ with kb_ingest_lock(kb_dir / ".openkb"):
+ return convert_document(
+ src,
+ kb_dir,
+ assume_locked=True,
+ staging_dir=staging_dir,
+ doc_name_override=doc_name_override,
+ )
+
# ------------------------------------------------------------------
# Load config & state
# ------------------------------------------------------------------
openkb_dir = kb_dir / ".openkb"
config = load_config(openkb_dir / "config.yaml")
threshold: int = config.get("pageindex_threshold", 20)
+ artifact_root = staging_dir if staging_dir is not None else kb_dir
registry = HashRegistry(openkb_dir / "hashes.json")
# ------------------------------------------------------------------
@@ -141,15 +168,23 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
skipped=True,
file_hash=file_hash,
doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem,
+ staging_dir=staging_dir,
)
- doc_name = resolve_doc_name(src, kb_dir, registry)
+ # Batch ingest reserves doc_names before parallel conversion so same-stem
+ # files behave like serial adds while still writing to isolated staging dirs.
+ doc_name = doc_name_override or resolve_doc_name(
+ src,
+ kb_dir,
+ registry,
+ persist_legacy=staging_dir is None,
+ )
# ------------------------------------------------------------------
# 2. Copy to raw/
# ------------------------------------------------------------------
- raw_dir = kb_dir / "raw"
+ raw_dir = artifact_root / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)
- if src.resolve().is_relative_to(raw_dir.resolve()):
+ if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()):
# Watch mode: the file already lives in raw/ — don't copy/rename.
raw_dest = src
else:
@@ -173,14 +208,15 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
is_long_doc=True,
file_hash=file_hash,
doc_name=doc_name,
+ staging_dir=staging_dir,
)
# ------------------------------------------------------------------
# 4/5. Convert to Markdown
# ------------------------------------------------------------------
- sources_dir = kb_dir / "wiki" / "sources"
+ sources_dir = artifact_root / "wiki" / "sources"
sources_dir.mkdir(parents=True, exist_ok=True)
- images_dir = kb_dir / "wiki" / "sources" / "images" / doc_name
+ images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name
images_dir.mkdir(parents=True, exist_ok=True)
if src.suffix.lower() == ".md":
@@ -197,11 +233,12 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
markdown = extract_base64_images(markdown, doc_name, images_dir)
dest_md = sources_dir / f"{doc_name}.md"
- dest_md.write_text(markdown, encoding="utf-8")
+ atomic_write_text(dest_md, markdown)
return ConvertResult(
raw_path=raw_dest,
source_path=dest_md,
file_hash=file_hash,
doc_name=doc_name,
+ staging_dir=staging_dir,
)
diff --git a/openkb/locks.py b/openkb/locks.py
index 2fa2815a..72966fc1 100644
--- a/openkb/locks.py
+++ b/openkb/locks.py
@@ -9,6 +9,7 @@
import contextlib
import json
+import logging
import os
import tempfile
import threading
@@ -97,6 +98,32 @@ def _local_lock(lock_path: Path) -> _LocalRwLock:
return lock
+def _drain_pending_journals(openkb_dir: Path) -> None:
+ """Roll back any mutation journals an interrupted process left behind.
+
+ Draining recovery is part of *taking* the mutation lock, not part of any
+ one command: a process that acquires the exclusive lock must restore the
+ KB to a known state before mutating it. Wiring this into ``kb_lock`` means
+ every exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``,
+ ``chat`` — drains on first acquisition, so an ``add`` that crashed mid-
+ commit cannot leave an active journal on disk that a later ``add`` rolls
+ back over the top of an intervening ``remove``/``recompile`` (clobbering
+ those edits). ``openkb_dir`` is the ``kb_dir/.openkb`` directory callers
+ pass to ``kb_lock``; the journal lives at ``openkb_dir/journal``, so the
+ KB root is ``openkb_dir.parent``.
+
+ The delayed import breaks the ``locks`` ↔ ``mutation`` cycle (``mutation``
+ imports atomic-write helpers from this module at top level). Called only
+ on first OS-lock acquisition (the reentrant branch above returns early),
+ never on a read lock, so queries pay nothing.
+ """
+ from openkb.mutation import recover_pending_journals
+
+ log = logging.getLogger(__name__)
+ for message in recover_pending_journals(openkb_dir.parent):
+ log.warning(message)
+
+
@contextlib.contextmanager
def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]:
"""Hold a KB-level advisory lock."""
@@ -134,6 +161,8 @@ def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]:
flock(fh, exclusive=exclusive)
held[resolved] = (1, 0) if exclusive else (0, 1)
try:
+ if exclusive:
+ _drain_pending_journals(openkb_dir)
yield
finally:
held.pop(resolved, None)
diff --git a/openkb/mutation.py b/openkb/mutation.py
new file mode 100644
index 00000000..cdac260b
--- /dev/null
+++ b/openkb/mutation.py
@@ -0,0 +1,216 @@
+"""Transactional helpers for KB mutation paths."""
+from __future__ import annotations
+
+import json
+import logging
+import shutil
+import uuid
+from dataclasses import dataclass, field
+from pathlib import Path
+
+from openkb.locks import atomic_write_bytes, atomic_write_json
+
+logger = logging.getLogger(__name__)
+
+
+def _copy_file_atomic(src: Path, dest: Path) -> None:
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ atomic_write_bytes(dest, src.read_bytes())
+
+
+@dataclass
+class MutationSnapshot:
+ """Snapshot of final KB paths touched by a mutation attempt."""
+
+ kb_dir: Path
+ backup_dir: Path
+ journal_path: Path
+ operation: str
+ details: dict = field(default_factory=dict)
+ entries: dict[Path, Path | None] = field(default_factory=dict)
+
+ def _journal_data(self, status: str) -> dict:
+ return {
+ "version": 1,
+ "operation": self.operation,
+ "status": status,
+ "kb_dir": str(self.kb_dir),
+ "backup_dir": str(self.backup_dir),
+ "details": self.details,
+ "entries": [
+ {
+ "target": str(target),
+ "backup": str(backup) if backup is not None else None,
+ }
+ for target, backup in self.entries.items()
+ ],
+ }
+
+ def write_journal(self, status: str) -> None:
+ self.journal_path.parent.mkdir(parents=True, exist_ok=True)
+ atomic_write_json(self.journal_path, self._journal_data(status))
+
+ def mark_committed(self) -> None:
+ """Mark the journal committed without removing the backup.
+
+ Call this the instant the mutation is durably applied (e.g. the
+ registry write has landed) so a subsequent
+ :func:`recover_pending_journals` discards the journal instead of
+ rolling it back. This is the commit signal; :meth:`discard` is the
+ post-commit cleanup that also removes the backup dir and journal
+ file and must itself be best-effort — it runs *after* the commit
+ point and its failure must never trigger a rollback.
+ """
+ self.write_journal("committed")
+
+ def rollback(self) -> None:
+ # Restore children before parents so directory deletes cannot remove
+ # paths that still need to be restored from a more specific backup.
+ for target, backup in sorted(
+ self.entries.items(),
+ key=lambda item: len(item[0].parts),
+ reverse=True,
+ ):
+ # Removal is unconditional; the backup (if any) is then restored
+ # in its place.
+ if target.is_dir():
+ shutil.rmtree(target, ignore_errors=True)
+ else:
+ target.unlink(missing_ok=True)
+ if backup is None:
+ continue
+ target.parent.mkdir(parents=True, exist_ok=True)
+ if backup.is_dir():
+ shutil.copytree(backup, target)
+ else:
+ _copy_file_atomic(backup, target)
+ self.write_journal("rolled_back")
+
+ def rollback_best_effort(self) -> Exception | None:
+ try:
+ self.rollback()
+ except Exception as exc:
+ logger.warning("Mutation rollback failed: %s", exc)
+ return exc
+ return None
+
+ def discard(self) -> None:
+ # Best-effort post-commit/post-rollback cleanup: callers have already
+ # written a terminal status (mark_committed or rollback), so there is
+ # nothing to re-write here — doing so would be dead work and would
+ # silently downgrade a "rolled_back" journal to "committed" moments
+ # before it is deleted.
+ shutil.rmtree(self.backup_dir, ignore_errors=True)
+ self.journal_path.unlink(missing_ok=True)
+
+ def discard_best_effort(self) -> Exception | None:
+ try:
+ self.discard()
+ except Exception as exc:
+ logger.warning("Mutation journal cleanup failed: %s", exc)
+ return exc
+ return None
+
+
+def snapshot_paths(
+ kb_dir: Path,
+ paths: list[Path],
+ *,
+ operation: str,
+ details: dict | None = None,
+) -> MutationSnapshot:
+ """Snapshot final KB paths before a mutation starts."""
+ kb_dir = kb_dir.resolve()
+ journal_id = uuid.uuid4().hex
+ backup_dir = kb_dir / ".openkb" / "staging" / f"rollback-{journal_id}"
+ backup_dir.mkdir(parents=True, exist_ok=False)
+ snapshot = MutationSnapshot(
+ kb_dir=kb_dir,
+ backup_dir=backup_dir,
+ journal_path=kb_dir / ".openkb" / "journal" / f"{journal_id}.json",
+ operation=operation,
+ details=details or {},
+ )
+ try:
+ for path in paths:
+ target = path.resolve()
+ if target in snapshot.entries:
+ continue
+ if not target.exists():
+ snapshot.entries[target] = None
+ continue
+ rel = target.relative_to(kb_dir)
+ backup = backup_dir / rel
+ backup.parent.mkdir(parents=True, exist_ok=True)
+ if target.is_dir():
+ shutil.copytree(target, backup)
+ else:
+ shutil.copy2(target, backup)
+ snapshot.entries[target] = backup
+ # The active journal is the recovery signal: once this exists, a future
+ # process can restore every recorded target even if the current one exits.
+ snapshot.write_journal("active")
+ except Exception:
+ # Partial snapshot: backup_dir exists on disk but no journal was
+ # written. recover_pending_journals only scans journals, so remove the
+ # orphan backup here — otherwise it leaks forever with nothing able to
+ # reach or clean it.
+ shutil.rmtree(backup_dir, ignore_errors=True)
+ raise
+ return snapshot
+
+
+def _snapshot_from_journal(path: Path, data: dict) -> MutationSnapshot:
+ snapshot = MutationSnapshot(
+ kb_dir=Path(data["kb_dir"]),
+ backup_dir=Path(data["backup_dir"]),
+ journal_path=path,
+ operation=data.get("operation", "mutation"),
+ details=data.get("details") or {},
+ )
+ snapshot.entries = {
+ Path(item["target"]): Path(item["backup"]) if item.get("backup") else None
+ for item in data.get("entries", [])
+ }
+ return snapshot
+
+
+def recover_pending_journals(kb_dir: Path) -> list[str]:
+ """Rollback active journals left by an interrupted process."""
+ journal_dir = kb_dir / ".openkb" / "journal"
+ if not journal_dir.is_dir():
+ return []
+ messages: list[str] = []
+ for journal_path in sorted(journal_dir.glob("*.json")):
+ try:
+ data = json.loads(journal_path.read_text(encoding="utf-8"))
+ snapshot = _snapshot_from_journal(journal_path, data)
+ status = data.get("status", "active")
+ if status in {"committed", "rolled_back"}:
+ snapshot.discard()
+ messages.append(f"Cleaned terminal mutation journal {journal_path.name}.")
+ continue
+ snapshot.rollback()
+ snapshot.discard()
+ messages.append(
+ f"Rolled back interrupted {snapshot.operation} journal {journal_path.name}."
+ )
+ except Exception as exc:
+ messages.append(
+ f"Could not recover journal {journal_path.name}: {type(exc).__name__}: {exc}"
+ )
+ return messages
+
+
+def publish_staged_tree(staging_dir: Path | None, kb_dir: Path) -> None:
+ """Copy staged raw/source artifacts into their final KB locations."""
+ if staging_dir is None or not staging_dir.exists():
+ return
+ for rel in ("raw", "wiki/sources"):
+ src_root = staging_dir / rel
+ if not src_root.exists():
+ continue
+ for src in src_root.rglob("*"):
+ if not src.is_file():
+ continue
+ _copy_file_atomic(src, kb_dir / rel / src.relative_to(src_root))
diff --git a/openkb/state.py b/openkb/state.py
index 10cb20cc..c7302770 100644
--- a/openkb/state.py
+++ b/openkb/state.py
@@ -13,12 +13,29 @@ class HashRegistry:
def __init__(self, path: Path) -> None:
self._path = path
+ self._persist_enabled = True
if path.exists():
with path.open("r", encoding="utf-8") as fh:
self._data: dict[str, dict] = json.load(fh)
else:
self._data = {}
+ @classmethod
+ def memory(cls, entries: dict[str, dict]) -> "HashRegistry":
+ """An in-memory view over ``entries`` that never writes back to disk.
+
+ Shares the on-disk registry's read/resolve contract (``get_by_path``,
+ ``find_legacy_by_stem``, ``all_entries``, ``add``) so callers that only
+ need to stage mutations in memory — batch doc_name reservation —
+ reuse the same code path instead of a parallel reimplementation.
+ ``add`` updates the in-memory dict but skips persistence.
+ """
+ reg = cls.__new__(cls)
+ reg._path = None
+ reg._persist_enabled = False
+ reg._data = {key: dict(value) for key, value in entries.items()}
+ return reg
+
# ------------------------------------------------------------------
# Query helpers
# ------------------------------------------------------------------
@@ -115,6 +132,8 @@ def remove_by_hash(self, file_hash: str) -> bool:
# ------------------------------------------------------------------
def _persist(self) -> None:
+ if not self._persist_enabled:
+ return
atomic_write_json(self._path, self._data)
# ------------------------------------------------------------------
diff --git a/tests/test_add_command.py b/tests/test_add_command.py
index 0199c9e2..f4f2f68c 100644
--- a/tests/test_add_command.py
+++ b/tests/test_add_command.py
@@ -80,17 +80,67 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path):
(docs_dir / "b.txt").write_text("B content")
(docs_dir / "ignore.xyz").write_text("skip me")
+ from openkb.cli import _PreparedAdd
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
runner = CliRunner()
- with patch("openkb.cli.add_single_file") as mock_add, \
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \
+ patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \
patch("openkb.cli._find_kb_dir", return_value=kb_dir):
runner.invoke(cli, ["add", str(docs_dir)])
- # Should be called for .md and .txt but not .xyz
- assert mock_add.call_count == 2
- called_names = {call.args[0].name for call in mock_add.call_args_list}
+ # Should be prepared/committed for .md and .txt but not .xyz
+ assert mock_prepare.call_count == 2
+ assert mock_commit.call_count == 2
+ called_names = {call.args[0].name for call in mock_prepare.call_args_list}
assert "a.md" in called_names
assert "b.txt" in called_names
assert "ignore.xyz" not in called_names
+ def test_add_directory_prefilters_known_hashes_before_prepare(self, tmp_path):
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ known = docs_dir / "known.md"
+ unknown = docs_dir / "unknown.md"
+ known.write_text("# Known", encoding="utf-8")
+ unknown.write_text("# Unknown", encoding="utf-8")
+
+ from openkb.cli import _PreparedAdd
+ from openkb.state import HashRegistry
+
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ HashRegistry.hash_file(known),
+ {
+ "name": "known.md",
+ "doc_name": "known",
+ "type": "md",
+ "path": "docs/known.md",
+ },
+ )
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
+ runner = CliRunner()
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \
+ patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \
+ patch("openkb.cli._find_kb_dir", return_value=kb_dir):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert result.exception is None
+ assert [call.args[0].name for call in mock_prepare.call_args_list] == ["unknown.md"]
+ assert [call.args[0].file_path.name for call in mock_commit.call_args_list] == [
+ "known.md",
+ "unknown.md",
+ ]
+ assert mock_commit.call_args_list[0].args[0].outcome == "skipped"
+
def test_add_unsupported_extension(self, tmp_path):
kb_dir = self._setup_kb(tmp_path)
doc = tmp_path / "file.xyz"
@@ -168,6 +218,45 @@ def test_add_short_doc_runs_compiler(self, tmp_path):
assert "path" in meta
assert "stale-old-hash" not in hashes
+ def test_commit_keeps_journal_when_rollback_fails(self, tmp_path):
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ source_path = kb_dir / "wiki" / "sources" / "broken.md"
+ source_path.write_text("# Broken", encoding="utf-8")
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "broken.md",
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "broken.md",
+ source_path=source_path,
+ file_hash="beadfeed00" * 8,
+ doc_name="broken",
+ ),
+ )
+
+ class FakeSnapshot:
+ journal_path = kb_dir / ".openkb" / "journal" / "broken.json"
+
+ def __init__(self):
+ self.discard_called = False
+
+ def rollback_best_effort(self):
+ return RuntimeError("rollback failed")
+
+ def discard_best_effort(self):
+ self.discard_called = True
+
+ fake_snapshot = FakeSnapshot()
+
+ with patch("openkb.cli.snapshot_paths", return_value=fake_snapshot), \
+ patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "failed"
+ assert fake_snapshot.discard_called is False
+
def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path):
"""Editing a pre-doc_name-era document must not fork the registry.
@@ -203,3 +292,407 @@ def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path):
new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"]
assert len(new_entries) == 1 # …exactly one entry survives
assert new_entries[0]["path"] # with path identity persisted
+
+ def test_add_directory_legacy_entry_converges_to_single_entry(self, tmp_path):
+ import json as json_mod
+
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "old-hash", {"name": "notes.md", "type": "md"}
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ (docs_dir / "notes.md").write_text("# Notes, edited", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+ assert "OK" in result.output
+
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ assert "old-hash" not in hashes
+ new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"]
+ assert len(new_entries) == 1
+ assert new_entries[0]["path"]
+
+ def test_add_directory_same_stem_files_get_reserved_names(self, tmp_path):
+ import json as json_mod
+
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ (docs_dir / "a").mkdir(parents=True)
+ (docs_dir / "b").mkdir(parents=True)
+ (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8")
+ (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+ assert "Document name conflict" not in result.output
+ assert result.output.count("[OK]") == 2
+
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ doc_names = {meta["doc_name"] for meta in hashes.values()}
+ assert len(doc_names) == 2
+ assert "report" in doc_names
+ assert any(name.startswith("report-") for name in doc_names)
+
+ def test_add_directory_same_stem_with_legacy_entry_no_duplicate(self, tmp_path):
+ """Two same-stem files plus a legacy (path-less) entry must not both
+ reserve the legacy doc_name. ``find_legacy_by_stem`` must be consumed
+ (idempotent) across the batch so the second file gets a suffixed name
+ instead of colliding with the first.
+ """
+ import json as json_mod
+
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ # Legacy entry: name + doc_name but NO path → find_legacy_by_stem matches "report".
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "legacy-hash", {"name": "report.md", "doc_name": "report", "type": "md"}
+ )
+ docs_dir = tmp_path / "docs"
+ (docs_dir / "a").mkdir(parents=True)
+ (docs_dir / "b").mkdir(parents=True)
+ (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8")
+ (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+ assert "Document name conflict" not in result.output
+ assert result.output.count("[OK]") == 2
+
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ report_names = [
+ m["doc_name"] for m in hashes.values() if str(m.get("doc_name", "")).startswith("report")
+ ]
+ assert len(report_names) == 2
+ assert len(set(report_names)) == 2 # no silent overwrite
+
+ def test_commit_rejects_same_filename_different_path_conflict(self, tmp_path):
+ """A path-indexed entry sharing doc_name + filename but with a
+ different path (a concurrent add of a same-named file in the
+ reservation/commit window) must be rejected, not silently
+ overwritten via the filename escape.
+ """
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ source_path = kb_dir / "wiki" / "sources" / "report.md"
+ source_path.parent.mkdir(parents=True, exist_ok=True)
+ source_path.write_text("# mine", encoding="utf-8")
+
+ # A DIFFERENT document already owns doc_name "report": different path,
+ # different hash, same filename.
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "other-hash",
+ {"name": "report.md", "doc_name": "report", "type": "md",
+ "path": "elsewhere/report.md"},
+ )
+
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "report.md",
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "report.md",
+ source_path=source_path,
+ file_hash="myhash" + "0" * 59,
+ doc_name="report",
+ ),
+ )
+
+ with patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "failed"
+ hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8"))
+ # The pre-existing document is untouched.
+ assert "other-hash" in hashes
+ assert hashes["other-hash"]["path"] == "elsewhere/report.md"
+ assert "myhash" + "0" * 59 not in hashes
+
+ def test_add_directory_jobs1_stages_each_file(self, tmp_path):
+ """jobs==1 must stage each file (pass a real staging_dir) instead of
+ writing the live KB unlocked via staging_dir=None.
+ """
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 1\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ (docs_dir / "a.md").write_text("# A", encoding="utf-8")
+ (docs_dir / "b.md").write_text("# B", encoding="utf-8")
+
+ from openkb.cli import _PreparedAdd
+
+ seen_staging: list = []
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ seen_staging.append(staging_dir)
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
+ runner = CliRunner()
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \
+ patch("openkb.cli._commit_prepared_add", return_value="added"), \
+ patch("openkb.cli._find_kb_dir", return_value=kb_dir):
+ runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert len(seen_staging) == 2
+ assert all(s is not None for s in seen_staging) # regression: was None
+
+ def test_commit_returns_added_when_post_commit_cleanup_fails(self, tmp_path):
+ """Once the registry write lands, a failure in journal cleanup must
+ NOT roll back the completed ingest (regression: the discard/log used
+ to live inside the rollback try).
+ """
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ source_path = kb_dir / "wiki" / "sources" / "ok.md"
+ source_path.parent.mkdir(parents=True, exist_ok=True)
+ source_path.write_text("# OK", encoding="utf-8")
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "ok.md",
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "ok.md",
+ source_path=source_path,
+ file_hash="ok" + "0" * 62,
+ doc_name="ok",
+ ),
+ )
+
+ class FakeSnapshot:
+ journal_path = kb_dir / ".openkb" / "journal" / "ok.json"
+
+ def mark_committed(self):
+ pass
+
+ def rollback_best_effort(self):
+ return None
+
+ def discard_best_effort(self):
+ return RuntimeError("cleanup failed")
+
+ with patch("openkb.cli.snapshot_paths", return_value=FakeSnapshot()), \
+ patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "added" # regression: was "failed" (rolled back success)
+ hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8"))
+ assert "ok" + "0" * 62 in hashes # registry write survived
+
+ def test_commit_rolls_back_real_snapshot_on_compile_failure(self, tmp_path):
+ """End-to-end rollback: a REAL snapshot + REAL publish, then a compile
+ failure, must restore the KB to its pre-add state — published raw and
+ source files removed, registry unchanged, no orphaned artifacts or
+ journal. The FakeSnapshot-based test cannot exercise this transactional
+ guarantee (the whole reason the feature exists).
+ """
+ import json as json_mod
+
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ pre_hashes = (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+
+ # A staging dir holding the converted artifacts that publish_staged_tree
+ # copies into the live KB before compile runs.
+ staging = tmp_path / "staging"
+ (staging / "raw").mkdir(parents=True)
+ (staging / "wiki" / "sources").mkdir(parents=True)
+ (staging / "raw" / "boom.md").write_text("# raw", encoding="utf-8")
+ source_md = staging / "wiki" / "sources" / "boom.md"
+ source_md.write_text("# converted", encoding="utf-8")
+
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "boom.md",
+ result=ConvertResult(
+ raw_path=staging / "raw" / "boom.md",
+ source_path=source_md,
+ file_hash="boom" + "0" * 60,
+ doc_name="boom",
+ ),
+ staging_dir=staging,
+ )
+
+ with patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")), \
+ patch("openkb.cli.time.sleep"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "failed"
+ # Published artifacts were rolled back (removed).
+ assert not (kb_dir / "raw" / "boom.md").exists()
+ assert not (kb_dir / "wiki" / "sources" / "boom.md").exists()
+ assert not (kb_dir / "wiki" / "summaries" / "boom.md").exists()
+ # Registry restored to pre-add state; no leaked boom entry.
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ assert hashes == json.loads(pre_hashes)
+ assert "boom" + "0" * 60 not in hashes
+ # No orphan journal/backup left behind; staging cleaned up.
+ assert not any((kb_dir / ".openkb" / "journal").glob("*.json"))
+ assert not staging.exists()
+
+ def test_add_snapshot_rolls_back_pageindex_sqlite_sidecars(self, tmp_path):
+ """Long-doc failures must not leave SQLite sidecars newer than pageindex.db."""
+ from openkb.cli import _snapshot_add_paths
+ from openkb.mutation import snapshot_paths
+
+ kb_dir = self._setup_kb(tmp_path)
+ openkb_dir = kb_dir / ".openkb"
+ (openkb_dir / "pageindex.db").write_bytes(b"before")
+
+ snapshot = snapshot_paths(
+ kb_dir,
+ _snapshot_add_paths(kb_dir, "long", None, None),
+ operation="add",
+ details={},
+ )
+
+ for suffix in ("-wal", "-shm", "-journal"):
+ (openkb_dir / f"pageindex.db{suffix}").write_bytes(b"after")
+
+ snapshot.rollback()
+ snapshot.discard()
+
+ assert (openkb_dir / "pageindex.db").read_bytes() == b"before"
+ for suffix in ("-wal", "-shm", "-journal"):
+ assert not (openkb_dir / f"pageindex.db{suffix}").exists()
+
+ def test_add_single_file_stages_unless_file_already_in_raw(self, tmp_path):
+ """stage=True (default for single-file add / chat) routes convert
+ through an isolated staging dir; stage=False (watch / URL, file
+ already in raw/) keeps convert's in-place path. The staging default
+ closes the crash-orphan window for files that don't already live in
+ raw/."""
+ from openkb.cli import _PreparedAdd, _add_single_file_locked
+
+ kb_dir = self._setup_kb(tmp_path)
+ doc = tmp_path / "test.md"
+ doc.write_text("# hi", encoding="utf-8")
+
+ captured: list = []
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ captured.append(staging_dir)
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir, outcome="skipped")
+
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \
+ patch("openkb.cli._commit_prepared_add", return_value="skipped"):
+ _add_single_file_locked(doc, kb_dir) # default stage=True
+ _add_single_file_locked(doc, kb_dir, stage=False)
+
+ assert captured[0] is not None # staged by default → no live-KB write pre-snapshot
+ assert captured[1] is None # in-place for watch/URL (file already in raw/)
+
+ def test_commit_conflict_guard_normalizes_unicode_filenames(self, tmp_path):
+ """A legacy (path-less) entry whose name is stored NFC must match a
+ file whose name the filesystem reports as NFD (macOS HFS+/APFS), so a
+ same-document re-add is allowed instead of mis-reported as a conflict.
+ The guard NFKC-normalizes both sides; a raw ``==`` would diverge."""
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ import unicodedata as _ud
+ nfc_name = "r\u00e9sum\u00e9.pdf" # NFC: é = U+00E9 (composed)
+ nfd_name = _ud.normalize("NFD", nfc_name) # NFD: e + U+0301 (decomposed)
+ assert nfc_name != nfd_name # raw bytes differ
+
+ source_path = kb_dir / "wiki" / "sources" / "resume.md"
+ source_path.parent.mkdir(parents=True, exist_ok=True)
+ source_path.write_text("# cv", encoding="utf-8")
+
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "legacy-hash", {"name": nfc_name, "doc_name": "resume", "type": "pdf"}
+ )
+
+ prepared = _PreparedAdd(
+ file_path=tmp_path / nfd_name,
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "resume.pdf",
+ source_path=source_path,
+ file_hash="new" + "0" * 61,
+ doc_name="resume",
+ ),
+ )
+
+ with patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ # NFC-vs-NFD is the same document, not a conflict → ingest proceeds.
+ assert outcome == "added"
+
+ def test_add_directory_jobs_gt1_runs_real_pipeline(self, tmp_path):
+ """jobs>1 ThreadPoolExecutor 路径的端到端测试。
+
+ 其余 jobs>1 测试都 mock 了 _prepare_add_file 和 _commit_prepared_add,
+ 所以真正的并发分支——futures 按扫描顺序提交、_staging_dir_for 分配、
+ prepared_outcomes.get(f) or futures[f].result() 回退、publish_staged_tree
+ 发布、registry 写入、staging 清理——从不被执行。这里用真实 prepare + 真实
+ commit,只 mock LLM compile,让最复杂的新路径真正跑一遍。
+ """
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 3\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ for letter in ("a", "b", "c"):
+ (docs_dir / f"{letter}.md").write_text(f"# {letter}", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert result.exception is None, result.output
+ assert result.output.count("[OK]") == 3
+ hashes = json.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ assert len(hashes) == 3
+ assert {meta["doc_name"] for meta in hashes.values()} == {"a", "b", "c"}
+ # Staging dirs cleaned up after each commit.
+ staging = kb_dir / ".openkb" / "staging"
+ if staging.exists():
+ assert not any(p.name.startswith("add-") for p in staging.iterdir())
+ # Source artifacts published from staging into the live KB.
+ for letter in ("a", "b", "c"):
+ assert (kb_dir / "wiki" / "sources" / f"{letter}.md").exists()
diff --git a/tests/test_mutation.py b/tests/test_mutation.py
new file mode 100644
index 00000000..1169c967
--- /dev/null
+++ b/tests/test_mutation.py
@@ -0,0 +1,111 @@
+from __future__ import annotations
+
+import pytest
+
+from openkb.mutation import recover_pending_journals, snapshot_paths
+
+
+def test_recover_pending_add_journal_rolls_back_files(tmp_path):
+ kb_dir = tmp_path
+ openkb_dir = kb_dir / ".openkb"
+ openkb_dir.mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+ new_file = kb_dir / "wiki" / "sources" / "doc.md"
+
+ snapshot_paths(
+ kb_dir,
+ [target, new_file],
+ operation="add",
+ details={"doc_name": "doc"},
+ )
+ target.write_text("after", encoding="utf-8")
+ new_file.parent.mkdir(parents=True)
+ new_file.write_text("new", encoding="utf-8")
+
+ messages = recover_pending_journals(kb_dir)
+
+ assert any("Rolled back interrupted add journal" in message for message in messages)
+ assert target.read_text(encoding="utf-8") == "before"
+ assert not new_file.exists()
+ assert not any((openkb_dir / "journal").glob("*.json"))
+
+
+def test_mark_committed_prevents_recovery_rollback(tmp_path):
+ """A snapshot marked committed must be discarded (not rolled back) by
+ recovery — the commit signal that protects a completed mutation from
+ being undone when post-commit cleanup fails.
+ """
+ kb_dir = tmp_path
+ openkb_dir = kb_dir / ".openkb"
+ openkb_dir.mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+
+ snapshot = snapshot_paths(
+ kb_dir, [target], operation="add", details={"doc_name": "doc"}
+ )
+ target.write_text("after", encoding="utf-8") # the "committed" mutation
+ snapshot.mark_committed()
+
+ messages = recover_pending_journals(kb_dir)
+
+ assert any("Cleaned terminal mutation journal" in m for m in messages)
+ assert target.read_text(encoding="utf-8") == "after" # NOT rolled back
+ assert not any((openkb_dir / "journal").glob("*.json"))
+
+
+def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path):
+ """A partially-created snapshot must not leak its backup dir: on any
+ failure before the journal is written, snapshot_paths removes the
+ rollback dir it created (recover_pending_journals only scans journals
+ and could never reach it otherwise).
+ """
+ kb_dir = tmp_path / "kb"
+ kb_dir.mkdir()
+ # A target that resolves OUTSIDE kb_dir makes relative_to(kb_dir) raise
+ # mid-loop, after backup_dir was already mkdir'd.
+ outside = tmp_path / "outside.txt"
+ outside.write_text("hi", encoding="utf-8")
+
+ with pytest.raises(ValueError):
+ snapshot_paths(kb_dir, [outside], operation="add", details={})
+
+ staging = kb_dir / ".openkb" / "staging"
+ if staging.exists():
+ assert not any(staging.iterdir()) # no orphan rollback- dir
+
+
+def test_exclusive_lock_drains_active_journal_before_yielding(tmp_path):
+ """Recovery runs on every exclusive-lock acquisition, not just the add path.
+
+ ``recover_pending_journals`` is wired into ``kb_lock``'s first exclusive
+ acquisition, so any mutation command — ``remove``/``recompile``/``lint``/
+ ``chat``, all of which take ``kb_ingest_lock`` directly — drains a crashed
+ predecessor's active journal before it mutates. This is the regression
+ guard for the bug where an ``add`` crash left an active journal that an
+ intervening ``remove`` ignored and a later ``add`` then rolled back over
+ the remove's edits.
+ """
+ from openkb.locks import kb_ingest_lock
+
+ kb_dir = tmp_path
+ openkb_dir = kb_dir / ".openkb"
+ openkb_dir.mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+
+ # Simulate a crashed add: snapshot taken, file mutated, but mark_committed
+ # never ran — an ACTIVE journal is left on disk.
+ snapshot_paths(kb_dir, [target], operation="add", details={"doc_name": "doc"})
+ target.write_text("after", encoding="utf-8")
+
+ # Any exclusive-lock holder drains before its body runs.
+ with kb_ingest_lock(openkb_dir):
+ assert target.read_text(encoding="utf-8") == "before"
+
+ assert target.read_text(encoding="utf-8") == "before"
+ assert not any((openkb_dir / "journal").glob("*.json"))