Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src/unstract/clone/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import threading
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from uuid import NAMESPACE_URL, uuid5

if TYPE_CHECKING:
from unstract.clone.client import PlatformClient
Expand Down Expand Up @@ -74,10 +75,30 @@ class RemapTable:

def __init__(self) -> None:
self._table: dict[str, dict[str, str]] = {}
# Synthetic target ids minted by record_planned() in dry-run. Tracked
# so the report can mask them — they are not real target ids.
self._planned: set[str] = set()

def record(self, entity: str, src_uuid: str, tgt_uuid: str) -> None:
self._table.setdefault(entity, {})[src_uuid] = tgt_uuid

def record_planned(self, entity: str, src_uuid: str) -> str:
"""Dry-run only: record a deterministic synthetic target id so
dependent phases can resolve the FK and plan-count without writing.
Never reaches the wire, so the fake id stays in-memory scaffolding.
"""
tgt_uuid = str(uuid5(NAMESPACE_URL, f"planned:{entity}:{src_uuid}"))
self.record(entity, src_uuid, tgt_uuid)
self._planned.add(tgt_uuid)
return tgt_uuid

def is_planned(self, tgt_uuid: str) -> bool:
"""True if ``tgt_uuid`` is a dry-run synthetic id (no real row on
target). Callers use this to skip live target lookups that would
query a non-existent id.
"""
return tgt_uuid in self._planned

def resolve(self, entity: str, src_uuid: str) -> str | None:
return self._table.get(entity, {}).get(src_uuid)

Expand All @@ -90,9 +111,19 @@ def resolve_any(self, src_uuid: str) -> str | None:
return hit
return None

def snapshot(self) -> dict[str, dict[str, str]]:
"""Read-only snapshot for the post-run report."""
return {entity: dict(m) for entity, m in self._table.items()}
def snapshot(self, *, hide_planned: bool = False) -> dict[str, dict[str, str]]:
"""Read-only snapshot for the post-run report. ``hide_planned`` masks
dry-run synthetic ids (rendered as ``"(planned)"``) while keeping the
per-entity counts intact.
"""

def _val(tgt: str) -> str:
return "(planned)" if hide_planned and tgt in self._planned else tgt

return {
entity: {src: _val(tgt) for src, tgt in m.items()}
for entity, m in self._table.items()
}


@dataclass
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def clone(
target=Endpoint(
base_url=target.base_url, organization_id=target.organization_id
),
dry_run=opts.dry_run,
)

run_started = time.perf_counter()
Expand Down Expand Up @@ -112,7 +113,7 @@ def clone(
)

report.total_duration_s = time.perf_counter() - run_started
report.remap_snapshot = ctx.remap.snapshot()
report.remap_snapshot = ctx.remap.snapshot(hide_planned=opts.dry_run)
return report
finally:
src_client.close()
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def _clone_one(
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("adapter", src_id)
logger.info(
"[dry-run] would create adapter '%s' [%s] src=%s", name, atype, src_id
)
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/api_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def _clone_one(
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("api_deployment", src_id)
logger.info(
"[dry-run] would create api_deployment '%s' src=%s", api_name, src_id
)
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def _clone_one(
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("connector", src_id)
logger.info("[dry-run] would create connector '%s' src=%s", name, src_id)
return
else:
Expand Down
62 changes: 50 additions & 12 deletions src/unstract/clone/phases/custom_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ def _clone_one(
"tool_name": tool_name,
}

# Each sub-path (adopt / fresh / fresh-dry-run) owns its own
# custom_tool remap, since only it knows whether the target id is
# real or a planned synthetic.
if tgt_tool_id is None:
return

with lock:
self.ctx.remap.record("custom_tool", src_tool_id, tgt_tool_id)

# Neither the export blob nor list rows carry share axes —
# share state comes from the source detail.
self.apply_share(
Expand All @@ -163,6 +163,10 @@ def _clone_one(
)

if self.ctx.options.dry_run:
# Can't republish the registry without writing, but ToolInstance
# needs a prompt_studio_registry remap to plan-count. Mirror it
# with a planned id derived from the source registry (read-only).
self._record_planned_registry(src_tool_id, tool_name, lock)
return

# Tools never exported on source (e.g. empty projects — backend
Expand Down Expand Up @@ -223,6 +227,30 @@ def _clone_one(
tgt_regs[0]["prompt_registry_id"],
)

def _record_planned_registry(
self, src_tool_id: str, tool_name: str, lock: threading.Lock
) -> None:
"""Dry-run: record a planned prompt_studio_registry remap from the
source registry id, so ToolInstancePhase can resolve tool_id and
plan-count. No-op for tools never exported on source (no registry).
"""
try:
src_regs = self.ctx.source.list_registries(custom_tool=src_tool_id)
except Exception as e:
logger.warning(
"[dry-run] source registry lookup failed for tool '%s' "
"(tool_instance plan may under-count): %s",
tool_name,
e,
)
return
if not src_regs:
return
with lock:
self.ctx.remap.record_planned(
"prompt_studio_registry", src_regs[0]["prompt_registry_id"]
)

def _adopt(
self,
match: dict[str, Any],
Expand All @@ -240,7 +268,8 @@ def _adopt(
tgt_tool_id = match["tool_id"]
if self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.adopted += 1
self.ctx.remap.record("custom_tool", src_tool_id, tgt_tool_id)
logger.info(
"[dry-run] would sync prompts into adopted tool '%s' src=%s -> tgt=%s",
tool_name,
Expand All @@ -260,6 +289,7 @@ def _adopt(

with lock:
result.adopted += 1
self.ctx.remap.record("custom_tool", src_tool_id, tgt_tool_id)
logger.info(
"adopted tool '%s' src=%s -> tgt=%s (prompts re-synced)",
tool_name,
Expand All @@ -276,14 +306,9 @@ def _create_fresh(
result: PhaseResult,
lock: threading.Lock,
) -> str | None:
if self.ctx.options.dry_run:
with lock:
result.skipped += 1
logger.info(
"[dry-run] would import tool '%s' src=%s", tool_name, src_tool_id
)
return None

# Run the source-side validations even in dry-run — they decide
# whether a real run would create or frictionless-skip, so the plan
# counts must reflect them. Only the target-write steps are stubbed.
default_profile = self._source_default_profile(src_tool_id, tool_name)
if default_profile is None:
with lock:
Expand All @@ -300,6 +325,18 @@ def _create_fresh(
)
return None

if self.ctx.options.dry_run:
# Target adapter resolution is skipped: adapters this run would
# create don't exist on target yet, so it can't resolve. The
# frictionless check above already caught the real skip cases.
with lock:
result.created += 1
tgt_tool_id = self.ctx.remap.record_planned("custom_tool", src_tool_id)
logger.info(
"[dry-run] would import tool '%s' src=%s", tool_name, src_tool_id
)
return tgt_tool_id
Comment thread
greptile-apps[bot] marked this conversation as resolved.

adapter_ids = self._resolve_target_adapter_ids(default_profile, tool_name)
if adapter_ids is None:
with lock:
Expand All @@ -321,6 +358,7 @@ def _create_fresh(
tgt_tool_id = tgt["tool_id"]
with lock:
result.created += 1
self.ctx.remap.record("custom_tool", src_tool_id, tgt_tool_id)
logger.info(
"created tool '%s' src=%s -> tgt=%s (needs_adapter_config=%s)",
tool_name,
Expand Down
31 changes: 18 additions & 13 deletions src/unstract/clone/phases/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,23 @@ def _build_tool_tasks(
report: CloneReport,
result: PhaseResult,
) -> list[_FileTask]:
try:
tgt_docs = self.ctx.target.list_prompt_documents(tgt_tool_id)
except Exception as e:
logger.exception(
"files: failed to list target DM rows for tool %s: %s",
tool_name,
e,
)
result.failed += 1
result.errors.append(f"list target docs {tool_name}: {e}")
return []
target_names = {d["document_name"] for d in tgt_docs}
# A planned (dry-run) tool id has no row on target; skip the live
# lookup and treat every source file as missing → predicted upload.
if self.ctx.options.dry_run and self.ctx.remap.is_planned(tgt_tool_id):
target_names: set[str] = set()
else:
try:
tgt_docs = self.ctx.target.list_prompt_documents(tgt_tool_id)
except Exception as e:
logger.exception(
"files: failed to list target DM rows for tool %s: %s",
tool_name,
e,
)
result.failed += 1
result.errors.append(f"list target docs {tool_name}: {e}")
return []
target_names = {d["document_name"] for d in tgt_docs}

tasks: list[_FileTask] = []
for doc in src_docs:
Expand All @@ -163,7 +168,7 @@ def _build_tool_tasks(
)
continue
if self.ctx.options.dry_run:
result.skipped += 1
result.created += 1
logger.info(
"[dry-run] files: would clone tool=%s file=%s",
tool_name,
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def _clone_one(
logger.info("reusing group '%s' src=%s -> tgt=%s", name, src_id, tgt["id"])
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("group", str(src_id))
logger.info("[dry-run] would create group '%s' src=%s", name, src_id)
if self.ctx.options.clone_group_members:
# Still computed so would-skip members show up in the report.
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def _clone_one(
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("pipeline", src_id)
logger.info("[dry-run] would create pipeline '%s' src=%s", name, src_id)
return
else:
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def _clone_one(
logger.info("adopted tag '%s' src=%s -> tgt=%s", name, src_id, tgt["id"])
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("tag", src_id)
logger.info("[dry-run] would create tag '%s' src=%s", name, src_id)
return
else:
Expand Down
33 changes: 20 additions & 13 deletions src/unstract/clone/phases/tool_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,31 @@ def _clone_workflow_tools(
result.skipped += 1
return

try:
existing = self.ctx.target.list_tool_instances(workflow_id=tgt_wf_id)
except Exception as e:
logger.exception(
"Failed to list target tool_instances for wf %s: %s", tgt_wf_id, e
)
with lock:
result.failed += 1
result.errors.append(f"list tgt tool_instances {tgt_wf_id}: {e}")
return
# A planned (dry-run) workflow id has no row on target; its
# tool_instance can't exist yet, so predict a create without the
# live lookup against a non-existent id.
if self.ctx.options.dry_run and self.ctx.remap.is_planned(tgt_wf_id):
existing: list[dict[str, Any]] = []
else:
try:
existing = self.ctx.target.list_tool_instances(workflow_id=tgt_wf_id)
except Exception as e:
logger.exception(
"Failed to list target tool_instances for wf %s: %s", tgt_wf_id, e
)
with lock:
result.failed += 1
result.errors.append(f"list tgt tool_instances {tgt_wf_id}: {e}")
return

if existing:
tgt_ti = existing[0]
if self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.adopted += 1
self.ctx.remap.record("tool_instance", src_ti_id, tgt_ti["id"])
logger.info(
"[dry-run] would re-PATCH metadata on adopted tool_instance "
"[dry-run] would adopt tool_instance (metadata PATCH skipped) "
"src=%s -> tgt=%s (workflow %s)",
src_ti_id,
tgt_ti["id"],
Expand All @@ -160,7 +166,8 @@ def _clone_workflow_tools(
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("tool_instance", src_ti_id)
logger.info(
"[dry-run] would create tool_instance for tgt workflow %s "
"(src tool_instance %s)",
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/clone/phases/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def _clone_one(
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
result.created += 1
self.ctx.remap.record_planned("workflow", src_id)
logger.info("[dry-run] would create workflow '%s' src=%s", name, src_id)
return
else:
Expand Down
Loading
Loading