Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/unstract/clone/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ class CloneContext:
# Source prompt_registry_ids whose CustomTool was skipped; used to
# cascade-skip dependent workflows downstream.
skipped_custom_tool_registry_ids: set[str] = field(default_factory=set)
# Source connector ids skipped because they can't be recreated via the API
# (OAuth-backed or redacted metadata) and no same-name target connector
# exists to adopt; workflows whose endpoints use one are cascade-skipped so
# the clone doesn't create pipelines that fail on every run.
skipped_connector_ids: set[str] = field(default_factory=set)
# Per-run memo for users/groups directory listings (sharing replication
# touches them once per endpoint, never per resource).
share_cache: dict[str, Any] = field(default_factory=dict)
Expand Down
161 changes: 102 additions & 59 deletions src/unstract/clone/phases/connector.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
"""Migrate connectors from source org to target org.

Same list -> per-id GET -> POST/adopt pattern as AdapterPhase. Two
connector-specific wrinkles:
Same list -> per-id GET -> adopt/POST pattern as AdapterPhase, but a
same-name target connector is adopted *before* any recreate attempt.
Two connector-specific wrinkles make that ordering matter:

1. **Connectors with redacted metadata are skipped.** Auto-provisioned
1. **Redacted-metadata connectors can't be recreated.** Auto-provisioned
rows (e.g. Unstract Cloud Storage) come back without
``connector_metadata``, so the SDK cannot reconstruct them on the
target. We detect this by inspecting the source GET response:
a falsy ``connector_metadata`` means the operator must rely on the
target's own provisioning (or re-create the row manually) — the
remap table records no entry for these.

2. **OAuth ``connector_auth`` is stripped from responses.** OAuth refresh
tokens are never returned by the API, so OAuth-backed connectors land
on the target without them. Operator must re-authorise on target.
``connector_metadata``, so the SDK has nothing to reconstruct.

2. **OAuth credentials can't be cloned.** OAuth refresh tokens are never
returned by the API, and a token can only be minted by completing the
OAuth flow as the target user — which only happens in the UI.

Neither can be created from source. Both are skipped, their ids recorded
so dependent workflows cascade-skip (avoiding pipelines that fail every
run). The recovery path is the same for both: provision the connector on
the target with the *same name*, then re-run — the adopt-first lookup
picks it up and wires the dependent workflow endpoints.
"""

from __future__ import annotations
Expand Down Expand Up @@ -83,31 +86,6 @@ def _clone_one(
result.errors.append(f"GET source detail {name}: {e}")
return

metadata = src.get("connector_metadata") or {}
if not metadata:
logger.info(
"skipping connector '%s' (src=%s, catalog=%s) — no source metadata",
name,
src_id,
src.get("connector_id"),
)
with lock:
result.skipped += 1
return

if _has_oauth_tokens(metadata):
logger.warning(
"skipping connector '%s' (src=%s, catalog=%s) — OAuth-backed; "
"re-authorise on target after the clone, then re-run to wire "
"dependent workflow endpoints.",
name,
src_id,
src.get("connector_id"),
)
with lock:
result.skipped += 1
return

try:
existing = self.ctx.target.list_connectors(name=name)
except Exception as e:
Expand All @@ -117,6 +95,10 @@ def _clone_one(
result.errors.append(f"GET {name}: {e}")
return

# Adopt a same-name target connector before anything else. This is the
# only way OAuth / redacted-metadata connectors come across: the
# operator provisions one on the target (where OAuth can complete), and
# a re-run adopts it and wires dependent workflow endpoints.
if existing:
tgt = existing[0]
if self.ctx.options.on_name_conflict == "abort":
Expand All @@ -131,34 +113,95 @@ def _clone_one(
src_id,
tgt["id"],
)
elif self.ctx.options.dry_run:
with lock:
result.created += 1
self.ctx.remap.record_planned("connector", src_id)
logger.info("[dry-run] would create connector '%s' src=%s", name, src_id)
return
else:
payload = build_post_payload(src, self._writable)
try:
tgt = self.ctx.target.create_connector(payload)
except Exception as e:
logger.exception("Failed to create connector %s: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"create {name}: {e}")
tgt = self._recreate_or_skip(src, name, src_id, result, lock)
if tgt is None:
return
with lock:
result.created += 1
logger.info(
"created connector '%s' src=%s -> tgt=%s",
name,
src_id,
tgt["id"],
)

with lock:
self.ctx.remap.record("connector", src_id, tgt["id"])
# Source detail (fetched above) carries the share axes.
self.apply_share(
src=src, tgt_id=tgt["id"], label=name, result=result, lock=lock
)

def _recreate_or_skip(
self,
src: dict[str, Any],
name: str,
src_id: str,
result: PhaseResult,
lock: threading.Lock,
) -> dict[str, Any] | None:
"""Recreate a connector from source, or skip when it can't be.

Returns the target connector dict on create, or ``None`` when the
connector is skipped (no metadata to reconstruct, or OAuth credentials
that the API can't clone). Skipped ids are recorded so dependent
workflows cascade-skip.
"""
metadata = src.get("connector_metadata") or {}
if not metadata:
logger.info(
"skipping connector '%s' (src=%s, catalog=%s) — metadata not "
"exposed by the API (auto-provisioned, e.g. Cloud Storage); "
"provision it on the target with the same name and re-run to adopt",
name,
src_id,
src.get("connector_id"),
)
with lock:
result.skipped += 1
self.ctx.skipped_connector_ids.add(src_id)
result.warnings.append(
f"connector '{name}' skipped — metadata not exposed by the API "
"(auto-provisioned); provision it on target with the same name "
"and re-run to adopt"
)
return None

if _has_oauth_tokens(metadata):
logger.warning(
"skipping connector '%s' (src=%s, catalog=%s) — OAuth-backed; "
"credentials can't be cloned. Recreate + authorise it on the "
"target with the same name, then re-run to adopt and wire "
"dependent workflows.",
name,
src_id,
src.get("connector_id"),
)
with lock:
result.skipped += 1
self.ctx.skipped_connector_ids.add(src_id)
result.warnings.append(
f"connector '{name}' skipped — OAuth-backed, credentials can't "
"be cloned; recreate + authorise on target with the same name, "
"then re-run to adopt"
)
return None

if self.ctx.options.dry_run:
with lock:
result.created += 1
self.ctx.remap.record_planned("connector", src_id)
logger.info("[dry-run] would create connector '%s' src=%s", name, src_id)
return None

payload = build_post_payload(src, self._writable)
try:
tgt = self.ctx.target.create_connector(payload)
except Exception as e:
logger.exception("Failed to create connector %s: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"create {name}: {e}")
return None
with lock:
result.created += 1
logger.info(
"created connector '%s' src=%s -> tgt=%s",
name,
src_id,
tgt["id"],
)
return tgt
9 changes: 6 additions & 3 deletions src/unstract/clone/phases/custom_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ def _remap_prompts(
if tgt_pid:
self.ctx.remap.record("prompt", sp["prompt_id"], tgt_pid)

def _record_planned_prompts(
self, src_tool_id: str, lock: threading.Lock
) -> None:
def _record_planned_prompts(self, src_tool_id: str, lock: threading.Lock) -> None:
"""Dry-run: record a planned prompt remap per source prompt so
prompt-scoped phases can resolve their FK and plan-count.
"""
Expand Down Expand Up @@ -507,6 +505,11 @@ def _register_frictionless_skip(
src_regs = []
with lock:
result.skipped += 1
result.warnings.append(
f"tool '{tool_name}' skipped — default profile references adapters "
f"not available to clone ({', '.join(missing_adapters)}); wire "
"equivalents on target and re-run"
)
for reg in src_regs:
reg_id = reg.get("prompt_registry_id")
if reg_id:
Expand Down
58 changes: 56 additions & 2 deletions src/unstract/clone/phases/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
WORKFLOW_PATH = "workflow/"


def _endpoint_connector_id(endpoint: dict[str, Any]) -> str | None:
"""``connector_instance`` is a nested dict on the endpoint listing."""
ci = endpoint.get("connector_instance")
if isinstance(ci, dict):
return ci.get("id")
return ci if isinstance(ci, str) else None


class WorkflowPhase(Phase):
name = "workflow"
share_path_template = "workflow/{id}/share/"
Expand All @@ -53,6 +61,7 @@ def run(self, report: CloneReport) -> PhaseResult:

# Built once so per-workflow cascade-skip checks stay O(1).
self._wf_to_src_tool_id = self._collect_wf_tool_map(result)
self._wf_to_connector_ids = self._collect_wf_connector_map()

logger.info("Found %d workflow(s) in source org", len(src_workflows))
self.parallel_map(
Expand Down Expand Up @@ -84,22 +93,67 @@ def _collect_wf_tool_map(self, result: PhaseResult) -> dict[str, str]:
mapping[wf_id] = tool_id
return mapping

def _collect_wf_connector_map(self) -> dict[str, set[str]]:
"""Map source workflow_id to the connector ids its endpoints use,
from a single bulk endpoint listing. Only built when connectors were
skipped, so dependent workflows can cascade-skip.
"""
if not self.ctx.skipped_connector_ids:
return {}
try:
endpoints = self.ctx.source.list_workflow_endpoints()
except Exception as e:
logger.warning(
"workflow phase: failed to list source endpoints for connector "
"cascade-skip (%s); proceeding without cascade",
e,
)
return {}
mapping: dict[str, set[str]] = {}
for ep in endpoints:
wf_id = ep.get("workflow")
conn_id = _endpoint_connector_id(ep)
if wf_id and conn_id:
mapping.setdefault(wf_id, set()).add(conn_id)
return mapping

def _clone_one(
self, src: dict[str, Any], result: PhaseResult, lock: threading.Lock
) -> None:
name = src["workflow_name"]
src_id = src["id"]

# Collect every blocking dependency before skipping, so a workflow
# held up by both a skipped tool and a skipped connector reports both
# reasons in one pass rather than one per re-run.
skip_reasons: list[str] = []
src_tool_id = self._wf_to_src_tool_id.get(src_id)
if src_tool_id and src_tool_id in self.ctx.skipped_custom_tool_registry_ids:
skip_reasons.append(
"its tool was skipped (frictionless adapter); wire equivalents "
"on target and re-run"
)
skipped_conns = (
self._wf_to_connector_ids.get(src_id, set())
& self.ctx.skipped_connector_ids
)
if skipped_conns:
skip_reasons.append(
"depends on un-clonable connector(s) "
f"({', '.join(sorted(skipped_conns))}); provision them on target "
"with the same name and re-run"
)
if skip_reasons:
logger.warning(
"skipping workflow '%s' src=%s — its tool was skipped in "
"custom_tool phase (frictionless adapter dependence)",
"skipping workflow '%s' src=%s — %s",
name,
src_id,
"; ".join(skip_reasons),
)
with lock:
result.skipped += 1
for reason in skip_reasons:
result.warnings.append(f"workflow '{name}' skipped — {reason}")
return

try:
Expand Down
45 changes: 41 additions & 4 deletions tests/clone/test_connector_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ def test_redacted_metadata_connector_skipped():
assert result.created == 0
assert tgt.posts == []
assert ctx.remap.resolve("connector", "src-ucs") is None
# Recorded for downstream cascade-skip + surfaced in the report.
assert ctx.skipped_connector_ids == {"src-ucs"}
assert any("User Storage" in w for w in result.warnings)


def test_oauth_connector_skipped_before_post():
"""OAuth-backed connectors (metadata carries access_token/refresh_token)
would fail target POST with OAuthTimeOut — skip ahead of POST so the
operator re-authorises post-clone.
def test_oauth_connector_skipped_when_no_target_match():
"""OAuth-backed connectors can't be recreated (credentials can't be cloned)
and aren't on the target yet — skip, record for cascade, warn.
"""
oauth = _src("src-gdrive", "Unstract's google drive")
oauth["connector_metadata"] = {
Expand All @@ -141,6 +143,41 @@ def test_oauth_connector_skipped_before_post():
assert result.failed == 0
assert tgt.posts == []
assert ctx.remap.resolve("connector", "src-gdrive") is None
assert ctx.skipped_connector_ids == {"src-gdrive"}
assert any("OAuth" in w for w in result.warnings)


def test_oauth_connector_adopted_when_target_exists():
"""The recovery path: operator provisioned a same-name connector on the
target (where OAuth completes), so a re-run adopts it instead of skipping —
populating the remap so dependent endpoints wire.
"""
oauth = _src("src-gdrive", "Unstract's google drive")
oauth["connector_metadata"] = {
"provider": "google-oauth2",
"access_token": "ya29.src-access",
"refresh_token": "1//src-refresh",
}
src = FakeClient([oauth])
tgt = FakeClient(
[
{
"id": "tgt-gdrive",
"connector_id": "gdrive|abc",
"connector_name": "Unstract's google drive",
"connector_type": "INPUT",
"connector_metadata": {"refresh_token": "1//tgt-refresh"},
}
]
)
ctx = _ctx(src, tgt, on_name_conflict="adopt")

result = ConnectorPhase(ctx).run(CloneReport())

assert result.adopted == 1
assert result.skipped == 0
assert ctx.skipped_connector_ids == set()
assert ctx.remap.resolve("connector", "src-gdrive") == "tgt-gdrive"


def test_idempotency_zero_creates_on_rerun():
Expand Down
Loading
Loading