diff --git a/api/projects.py b/api/projects.py index bea220f..be98c16 100644 --- a/api/projects.py +++ b/api/projects.py @@ -5,31 +5,23 @@ from api._flask_types import FlaskReturn, json_response from api.error_codes import ErrorCode, error_response from models.project import ProjectSessionRowDict, SessionListItemDict -from models.session import SessionDict from utils.exclusion_rules import is_session_excluded +from utils.jsonl_parser import quick_session_info from utils.session_cache import get_cached_session from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join +from utils.session_summary_cache import ( + SummaryCacheRowDict, + get_summary, + put_summary, + rules_fingerprint, + session_row_from_summary, + summary_from_peek, + summary_from_session, +) projects_bp = Blueprint("projects", __name__) -def _session_row_ok(s: SessionListItemDict, parsed: SessionDict) -> ProjectSessionRowDict: - meta = parsed["metadata"] - models = meta.get("models_used", []) - return { - "id": s["id"], - "path": s["path"], - "size_bytes": s["size_bytes"], - "modified": s["modified"], - "title": parsed["title"], - "models": sorted(models) if isinstance(models, set) else list(models), - "tokens": meta["total_input_tokens"] + meta["total_output_tokens"], - "tool_calls": meta["total_tool_calls"], - "first_timestamp": meta["first_timestamp"], - "last_timestamp": meta["last_timestamp"], - } - - def _session_row_error(s: SessionListItemDict) -> ProjectSessionRowDict: return { "id": s["id"], @@ -41,15 +33,32 @@ def _session_row_error(s: SessionListItemDict) -> ProjectSessionRowDict: } +def _peek_or_cache_summary(path: str, mtime: float, rules_fp: str) -> SummaryCacheRowDict: + """Return a cached summary row (any completeness) or peek the file and store a partial row. + + Used by get_projects for fast landing-page counts. Partial rows (is_complete=False) + are acceptable here — is_untitled is derived from the same first-user-text peek that + quick_session_info uses; peek and full-parse agree for the vast majority of sessions + (first user message within the first 80 lines). The session list path always upgrades + to a complete row via get_cached_session, so session_count and list count align after + the first session-list visit. With no exclusion rules the counts are identical on first + visit too, because is_excluded is always False for both paths. + """ + cached = get_summary(path, mtime, rules_fp) + if cached is not None: + return cached + info = quick_session_info(path) + row = summary_from_peek(info) + put_summary(path, mtime, rules_fp, row) + return row + + @projects_bp.route("/api/projects") def get_projects() -> FlaskReturn: base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() projects = list_projects(base) - - # Enrich each project with accurate titled-session count and latest timestamp - # so the landing page matches what the workspace page shows. - # Uses quick_session_info() which peeks at files without full parsing. - from utils.jsonl_parser import quick_session_info + rules = current_app.config.get("EXCLUSION_RULES") or [] + rules_fp = rules_fingerprint(rules) for project in projects: sessions = list_sessions(project["path"]) @@ -57,14 +66,20 @@ def get_projects() -> FlaskReturn: latest_ts = None for s in sessions: try: - info = quick_session_info(s["path"]) - if info["title"] == "Untitled Session": + row = _peek_or_cache_summary(s["path"], s["modified"], rules_fp) + if row["is_untitled"]: + continue + if row["is_complete"] and row["is_excluded"]: continue titled_count += 1 - ts = info.get("last_timestamp") or info.get("first_timestamp") + ts = row.get("last_timestamp") or row.get("first_timestamp") if ts and (latest_ts is None or ts > latest_ts): latest_ts = ts except Exception: + current_app.logger.exception( + "Failed to peek session summary for project %s", + project["name"], + ) titled_count += 1 project["session_count"] = titled_count if latest_ts: @@ -82,21 +97,25 @@ def get_project_sessions(project_name: str) -> FlaskReturn: return error_response(ErrorCode.INVALID_PATH, "Invalid path", 400) sessions = list_sessions(project_dir) rules = current_app.config.get("EXCLUSION_RULES") or [] + rules_fp = rules_fingerprint(rules) result: list[ProjectSessionRowDict] = [] for s in sessions: try: - parsed = get_cached_session(s["path"]) - # Skip untitled sessions (no real conversation) - if parsed["title"] == "Untitled Session": + cached = get_summary(s["path"], s["modified"], rules_fp) + if cached is not None and cached["is_complete"]: + if cached["is_untitled"] or cached["is_excluded"]: + continue + result.append(session_row_from_summary(s, cached)) continue - if is_session_excluded(rules, parsed, project_name): + + parsed = get_cached_session(s["path"]) + excluded = is_session_excluded(rules, parsed, project_name) + row = summary_from_session(parsed, is_excluded=excluded) + put_summary(s["path"], s["modified"], rules_fp, row) + if row["is_untitled"] or excluded: continue - result.append(_session_row_ok(s, parsed)) + result.append(session_row_from_summary(s, row)) except Exception: - # Full detail (class, message, traceback) to the server log via - # logger.exception. The per-session card carries only `error: True` - # — the class-name+message string was a leak (issue #25). The - # operator looks at the server log for triage. current_app.logger.exception("Failed to parse session %s", s["id"]) result.append(_session_row_error(s)) return json_response(result) diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py index 9500d55..f1124dd 100644 --- a/tests/test_api_integration.py +++ b/tests/test_api_integration.py @@ -9,6 +9,8 @@ from __future__ import annotations +import pytest + from app import CSP_POLICY from tests.conftest import assert_error_response as _assert_error_shape @@ -124,3 +126,46 @@ def test_search_valid_limit(client): results = resp.get_json() assert isinstance(results, list) assert len(results) <= 5 + + +# --- session summary cache (disk) --- + + +@pytest.fixture +def summary_cache_db(tmp_path, monkeypatch): + from utils.session_summary_cache import clear_cache, reset_connection_for_tests + + db = tmp_path / "session_summary_cache.sqlite" + reset_connection_for_tests(db) + yield db + clear_cache() + + +def test_project_session_count_matches_list(client, summary_cache_db): + """Card count and session list agree when no exclusion rules are active. + + get_projects uses peek (partial row); get_project_sessions uses full parse. + Both filter on is_untitled, and with no rules is_excluded is always False, + so counts align — this is the alignment guarantee from issue #109. + """ + # Hit session list first so disk cache is warm with complete rows. + sessions = client.get("/api/projects/test-project/sessions").get_json() + projects = client.get("/api/projects").get_json() + project = next(p for p in projects if p["name"] == "test-project") + assert project["session_count"] == len(sessions) + + +def test_project_sessions_uses_disk_cache_on_second_request(client, summary_cache_db, monkeypatch): + client.get("/api/projects/test-project/sessions") + calls = 0 + + def counting_get_cached(path: str): + nonlocal calls + calls += 1 + from utils.session_cache import get_cached_session as real_get + + return real_get(path) + + monkeypatch.setattr("api.projects.get_cached_session", counting_get_cached) + client.get("/api/projects/test-project/sessions") + assert calls == 0 diff --git a/tests/test_session_path.py b/tests/test_session_path.py index 1662cc2..b7e1996 100644 --- a/tests/test_session_path.py +++ b/tests/test_session_path.py @@ -39,3 +39,30 @@ def test_get_claude_projects_dir_on_windows_runner( got = session_path.get_claude_projects_dir() expected = os.path.join(str(profile), ".claude", "projects") assert got == expected + + +def test_display_name_cache_avoids_repeat_file_reads( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + session_path.clear_display_name_cache() + project_dir = tmp_path / "proj-hash" + project_dir.mkdir() + jsonl = project_dir / "session.jsonl" + jsonl.write_text( + '{"type":"user","cwd":"/home/user/MyProject","timestamp":"2026-01-01T00:00:00Z"}\n', + encoding="utf-8", + ) + calls = 0 + real_get = session_path._get_display_name + + def counting_get_display_name(*args, **kwargs): + nonlocal calls + calls += 1 + return real_get(*args, **kwargs) + + monkeypatch.setattr(session_path, "_get_display_name", counting_get_display_name) + session_path.list_projects(str(tmp_path)) + first_calls = calls + session_path.list_projects(str(tmp_path)) + assert first_calls > 0 + assert calls == first_calls diff --git a/tests/test_session_summary_cache.py b/tests/test_session_summary_cache.py new file mode 100644 index 0000000..74f4167 --- /dev/null +++ b/tests/test_session_summary_cache.py @@ -0,0 +1,190 @@ +"""Unit tests for utils.session_summary_cache.""" + +from __future__ import annotations + +import os +import shutil +from collections.abc import Iterator +from pathlib import Path + +import pytest + +from utils.jsonl_parser import parse_session, quick_session_info +from utils.session_summary_cache import ( + clear_cache, + get_summary, + put_summary, + reset_connection_for_tests, + rules_fingerprint, + summary_from_peek, + summary_from_session, +) + +FIXTURES = Path(__file__).resolve().parent / "fixtures" +SAMPLE_SESSION = FIXTURES / "session_with_tools.jsonl" + + +@pytest.fixture +def sample_session(tmp_path: Path) -> Path: + dest = tmp_path / "session.jsonl" + shutil.copy(SAMPLE_SESSION, dest) + return dest + + +@pytest.fixture +def cache_db(tmp_path: Path) -> Iterator[Path]: + db = tmp_path / "summary.sqlite" + reset_connection_for_tests(db) + yield db + clear_cache() + + +def test_rules_fingerprint_empty() -> None: + assert rules_fingerprint([]) == "none" + + +def test_max_cache_rows_invalid_env_falls_back( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SUMMARY_CACHE_MAX_ROWS", "not-a-number") + from utils.session_summary_cache import DEFAULT_MAX_ROWS, max_cache_rows + + assert max_cache_rows() == DEFAULT_MAX_ROWS + + +def test_max_cache_rows_valid_env_enforces_minimum( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SUMMARY_CACHE_MAX_ROWS", "0") + from utils.session_summary_cache import max_cache_rows + + assert max_cache_rows() == 1 + + +def test_rules_fingerprint_stable() -> None: + rules = [[("word", "secret")]] + assert rules_fingerprint(rules) == rules_fingerprint(rules) + + +def test_cache_miss_returns_none(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + assert get_summary(path, mtime, "none") is None + + +def test_cache_hit_round_trip(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + parsed = parse_session(path) + row = summary_from_session(parsed, is_excluded=False) + put_summary(path, mtime, "none", row) + hit = get_summary(path, mtime, "none") + assert hit is not None + assert hit["title"] == row["title"] + assert hit["tokens"] == row["tokens"] + assert hit["is_complete"] is True + + +def test_cache_invalidates_on_mtime_change(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + parsed = parse_session(path) + put_summary(path, mtime, "none", summary_from_session(parsed, is_excluded=False)) + stat = sample_session.stat() + os.utime(sample_session, (stat.st_mtime + 10, stat.st_mtime + 10)) + new_mtime = sample_session.stat().st_mtime + assert new_mtime != mtime + assert get_summary(path, new_mtime, "none") is None + + +def test_exclusion_key_separation(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + parsed = parse_session(path) + put_summary(path, mtime, "none", summary_from_session(parsed, is_excluded=False)) + put_summary(path, mtime, "rules_a", summary_from_session(parsed, is_excluded=True)) + hit_none = get_summary(path, mtime, "none") + hit_rules = get_summary(path, mtime, "rules_a") + assert hit_none is not None and hit_none["is_excluded"] is False + assert hit_rules is not None and hit_rules["is_excluded"] is True + + +def test_peek_partial_row(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + row = summary_from_peek(quick_session_info(path)) + put_summary(path, mtime, "none", row) + hit = get_summary(path, mtime, "none") + assert hit is not None + assert hit["is_complete"] is False + assert hit["tokens"] == 0 + + +def test_lru_eviction( + sample_session: Path, cache_db: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr("utils.session_summary_cache.DEFAULT_MAX_ROWS", 2) + clock = {"t": 100.0} + + def fake_time() -> float: + clock["t"] += 100.0 + return clock["t"] + + monkeypatch.setattr("utils.session_summary_cache.time.time", fake_time) + + content = sample_session.read_text(encoding="utf-8") + paths = [] + for name in ("a.jsonl", "b.jsonl", "c.jsonl"): + p = sample_session.parent / name + p.write_text(content, encoding="utf-8") + paths.append(p) + + for p in paths[:2]: + mtime = p.stat().st_mtime + put_summary( + str(p), + mtime, + "none", + summary_from_peek(quick_session_info(str(p))), + ) + + first = paths[0] + first_mtime = first.stat().st_mtime + assert get_summary(str(first), first_mtime, "none") is not None + + third = paths[2] + third_mtime = third.stat().st_mtime + put_summary( + str(third), + third_mtime, + "none", + summary_from_peek(quick_session_info(str(third))), + ) + + second = paths[1] + assert get_summary(str(second), second.stat().st_mtime, "none") is None + assert get_summary(str(first), first_mtime, "none") is not None + assert get_summary(str(third), third_mtime, "none") is not None + + +def test_put_summary_drops_stale_mtime_rows(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + parsed = parse_session(path) + put_summary(path, mtime, "none", summary_from_session(parsed, is_excluded=False)) + stat = sample_session.stat() + os.utime(sample_session, (stat.st_mtime + 10, stat.st_mtime + 10)) + new_mtime = sample_session.stat().st_mtime + put_summary(path, new_mtime, "none", summary_from_session(parsed, is_excluded=False)) + assert get_summary(path, mtime, "none") is None + assert get_summary(path, new_mtime, "none") is not None + + +def test_clear_cache(sample_session: Path, cache_db: Path) -> None: + path = str(sample_session) + mtime = sample_session.stat().st_mtime + parsed = parse_session(path) + put_summary(path, mtime, "none", summary_from_session(parsed, is_excluded=False)) + clear_cache() + reset_connection_for_tests(cache_db) + assert get_summary(path, mtime, "none") is None diff --git a/utils/session_path.py b/utils/session_path.py index a3f6529..13b95e5 100644 --- a/utils/session_path.py +++ b/utils/session_path.py @@ -5,11 +5,15 @@ import logging import os import platform +import threading from models.project import ProjectDict, SessionListItemDict _logger = logging.getLogger(__name__) +_display_name_cache: dict[str, tuple[float, str]] = {} +_display_name_lock = threading.Lock() + def safe_join(base: str, *parts: str) -> str: """Join path components and verify the result stays under base. @@ -31,6 +35,30 @@ def get_claude_projects_dir() -> str: return os.path.join(home, ".claude", "projects") +def clear_display_name_cache() -> None: + """Clear the in-memory display-name cache (for tests).""" + with _display_name_lock: + _display_name_cache.clear() + + +def _resolve_display_name( + project_dir: str, jsonl_files: list[str], fallback: str, max_mtime: float +) -> str: + with _display_name_lock: + hit = _display_name_cache.get(project_dir) + if hit is not None and hit[0] == max_mtime: + return hit[1] + display_name = fallback + for jf in jsonl_files: + candidate = _get_display_name(os.path.join(project_dir, jf), None) + if candidate is not None: + display_name = candidate + break + with _display_name_lock: + _display_name_cache[project_dir] = (max_mtime, display_name) + return display_name + + def list_projects(base_dir: str | None = None) -> list[ProjectDict]: """Scan the projects dir and return info for each one that has .jsonl files.""" base = base_dir or get_claude_projects_dir() @@ -52,13 +80,7 @@ def list_projects(base_dir: str | None = None) -> list[ProjectDict]: from datetime import datetime, timezone last_modified = datetime.fromtimestamp(latest_mtime, tz=timezone.utc).isoformat() - # Read cwd from sessions to get the real project path - display_name = name - for jf in jsonl_files: - candidate = _get_display_name(os.path.join(project_dir, jf), None) - if candidate is not None: - display_name = candidate - break + display_name = _resolve_display_name(project_dir, jsonl_files, name, latest_mtime) projects.append( { "name": name, diff --git a/utils/session_summary_cache.py b/utils/session_summary_cache.py new file mode 100644 index 0000000..b4e6771 --- /dev/null +++ b/utils/session_summary_cache.py @@ -0,0 +1,259 @@ +"""Disk-backed session list summary cache with mtime and rules fingerprint keys.""" + +from __future__ import annotations + +import hashlib +import json +import os +import sqlite3 +import threading +import time +from pathlib import Path +from typing import Any, TypedDict + +from models.project import ProjectSessionRowDict, SessionListItemDict +from models.session import QuickSessionInfoDict, SessionDict + +DEFAULT_MAX_ROWS = 2000 + + +def max_cache_rows() -> int: + """Return LRU capacity (override via CLAUDE_CODE_CHAT_BROWSER_SUMMARY_CACHE_MAX_ROWS).""" + raw = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SUMMARY_CACHE_MAX_ROWS", "").strip() + if not raw: + return DEFAULT_MAX_ROWS + try: + return max(1, int(raw)) + except ValueError: + return DEFAULT_MAX_ROWS + + +_lock = threading.Lock() +_conn: sqlite3.Connection | None = None + + +class SummaryCacheRowDict(TypedDict): + title: str + models: list[str] + tokens: int + tool_calls: int + first_timestamp: str | None + last_timestamp: str | None + is_excluded: bool + is_untitled: bool + is_complete: bool + + +def cache_db_path() -> Path: + """Return SQLite path for the summary cache (overridable in tests).""" + override = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SUMMARY_CACHE", "").strip() + if override: + return Path(override) + return Path.home() / ".claude-code-chat-browser" / "session_summary_cache.sqlite" + + +def rules_fingerprint(rules: list[Any]) -> str: + """Stable short hash for exclusion rules loaded at process start.""" + if not rules: + return "none" + payload = json.dumps(rules, sort_keys=True, default=str) + return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16] + + +def _ensure_connection() -> sqlite3.Connection: + global _conn + if _conn is not None: + return _conn + path = cache_db_path() + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(path), check_same_thread=False, timeout=30.0) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS summary_cache ( + path TEXT NOT NULL, + mtime REAL NOT NULL, + rules_fp TEXT NOT NULL, + payload TEXT NOT NULL, + accessed_at REAL NOT NULL, + PRIMARY KEY (path, mtime, rules_fp) + ) + """ + ) + conn.commit() + _conn = conn + return conn + + +def _row_to_payload(row: SummaryCacheRowDict) -> str: + return json.dumps(row, ensure_ascii=False) + + +def _payload_to_row(raw: str) -> SummaryCacheRowDict: + data = json.loads(raw) + return SummaryCacheRowDict( + title=str(data["title"]), + models=list(data.get("models") or []), + tokens=int(data.get("tokens") or 0), + tool_calls=int(data.get("tool_calls") or 0), + first_timestamp=data.get("first_timestamp"), + last_timestamp=data.get("last_timestamp"), + is_excluded=bool(data.get("is_excluded")), + is_untitled=bool(data.get("is_untitled")), + is_complete=bool(data.get("is_complete", True)), + ) + + +def get_summary(path: str, mtime: float, rules_fingerprint: str) -> SummaryCacheRowDict | None: + """Return a cached summary row when path, mtime, and rules_fp match.""" + abspath = os.path.abspath(path) + with _lock: + conn = _ensure_connection() + row = conn.execute( + "SELECT payload FROM summary_cache WHERE path = ? AND mtime = ? AND rules_fp = ?", + (abspath, mtime, rules_fingerprint), + ).fetchone() + if row is None: + return None + conn.execute( + """ + UPDATE summary_cache + SET accessed_at = ? + WHERE path = ? AND mtime = ? AND rules_fp = ? + """, + (time.time(), abspath, mtime, rules_fingerprint), + ) + conn.commit() + return _payload_to_row(str(row[0])) + + +def put_summary( + path: str, + mtime: float, + rules_fingerprint: str, + row: SummaryCacheRowDict, +) -> None: + """Store or replace a summary row and evict LRU entries when over capacity.""" + abspath = os.path.abspath(path) + now = time.time() + payload = _row_to_payload(row) + with _lock: + conn = _ensure_connection() + conn.execute( + "DELETE FROM summary_cache WHERE path = ? AND rules_fp = ? AND mtime != ?", + (abspath, rules_fingerprint, mtime), + ) + conn.execute( + """ + INSERT INTO summary_cache (path, mtime, rules_fp, payload, accessed_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(path, mtime, rules_fp) DO UPDATE SET + payload = excluded.payload, + accessed_at = excluded.accessed_at + """, + (abspath, mtime, rules_fingerprint, payload, now), + ) + count = conn.execute("SELECT COUNT(*) FROM summary_cache").fetchone() + limit = max_cache_rows() + if count is not None and int(count[0]) > limit: + conn.execute( + """ + DELETE FROM summary_cache + WHERE rowid IN ( + SELECT rowid FROM summary_cache + ORDER BY accessed_at ASC + LIMIT ? + ) + """, + (int(count[0]) - limit,), + ) + conn.commit() + + +def clear_cache() -> None: + """Clear all cached summaries and close the connection (for tests).""" + global _conn + with _lock: + if _conn is not None: + _conn.execute("DELETE FROM summary_cache") + _conn.commit() + _conn.close() + _conn = None + + +def summary_from_peek(info: QuickSessionInfoDict) -> SummaryCacheRowDict: + """Build a partial summary row from a lightweight JSONL peek. + + ``is_excluded`` is hard-coded ``False`` because peek cannot evaluate exclusion + rules without reading full message bodies. Callers that need accurate exclusion + (``get_project_sessions``) must check ``is_complete`` and fall back to a full + parse. ``get_projects`` uses partial rows for fast card counts: with no + exclusion rules the value is correct; with rules active, excluded sessions may + be counted until the session list upgrades the row to a complete entry — this + is an accepted trade-off documented in issue #109. + """ + title = info["title"] + return SummaryCacheRowDict( + title=title, + models=[], + tokens=0, + tool_calls=0, + first_timestamp=info.get("first_timestamp"), + last_timestamp=info.get("last_timestamp"), + is_excluded=False, + is_untitled=title == "Untitled Session", + is_complete=False, + ) + + +def summary_from_session( + parsed: SessionDict, + *, + is_excluded: bool, +) -> SummaryCacheRowDict: + """Build a complete summary row from a parsed session.""" + meta = parsed["metadata"] + models = meta.get("models_used", []) + title = parsed["title"] + return SummaryCacheRowDict( + title=title, + models=sorted(models) if isinstance(models, set) else list(models), + tokens=meta["total_input_tokens"] + meta["total_output_tokens"], + tool_calls=meta["total_tool_calls"], + first_timestamp=meta["first_timestamp"], + last_timestamp=meta["last_timestamp"], + is_excluded=is_excluded, + is_untitled=title == "Untitled Session", + is_complete=True, + ) + + +def session_row_from_summary( + s: SessionListItemDict, + row: SummaryCacheRowDict, +) -> ProjectSessionRowDict: + """Map a cached summary onto the API session list row shape.""" + return ProjectSessionRowDict( + id=s["id"], + path=s["path"], + size_bytes=s["size_bytes"], + modified=s["modified"], + title=row["title"], + models=row["models"], + tokens=row["tokens"], + tool_calls=row["tool_calls"], + first_timestamp=row["first_timestamp"], + last_timestamp=row["last_timestamp"], + ) + + +def reset_connection_for_tests(db_path: Path) -> None: + """Point the cache at *db_path* and open a fresh connection.""" + global _conn + with _lock: + if _conn is not None: + _conn.close() + _conn = None + os.environ["CLAUDE_CODE_CHAT_BROWSER_SUMMARY_CACHE"] = str(db_path) + _ensure_connection()