diff --git a/src/unstract/clone/context.py b/src/unstract/clone/context.py index aeef4c3..fe73011 100644 --- a/src/unstract/clone/context.py +++ b/src/unstract/clone/context.py @@ -141,6 +141,11 @@ class CloneContext: # Source prompt_registry_ids whose CustomTool was skipped; used to # cascade-skip dependent workflows downstream. skipped_custom_tool_registry_ids: set[str] = field(default_factory=set) + # Source connector ids skipped because they can't be recreated via the API + # (OAuth-backed or redacted metadata) and no same-name target connector + # exists to adopt; workflows whose endpoints use one are cascade-skipped so + # the clone doesn't create pipelines that fail on every run. + skipped_connector_ids: set[str] = field(default_factory=set) # Per-run memo for users/groups directory listings (sharing replication # touches them once per endpoint, never per resource). share_cache: dict[str, Any] = field(default_factory=dict) diff --git a/src/unstract/clone/phases/connector.py b/src/unstract/clone/phases/connector.py index c593e42..97a6971 100644 --- a/src/unstract/clone/phases/connector.py +++ b/src/unstract/clone/phases/connector.py @@ -1,19 +1,22 @@ """Migrate connectors from source org to target org. -Same list -> per-id GET -> POST/adopt pattern as AdapterPhase. Two -connector-specific wrinkles: +Same list -> per-id GET -> adopt/POST pattern as AdapterPhase, but a +same-name target connector is adopted *before* any recreate attempt. +Two connector-specific wrinkles make that ordering matter: -1. **Connectors with redacted metadata are skipped.** Auto-provisioned +1. **Redacted-metadata connectors can't be recreated.** Auto-provisioned rows (e.g. Unstract Cloud Storage) come back without - ``connector_metadata``, so the SDK cannot reconstruct them on the - target. We detect this by inspecting the source GET response: - a falsy ``connector_metadata`` means the operator must rely on the - target's own provisioning (or re-create the row manually) — the - remap table records no entry for these. - -2. **OAuth ``connector_auth`` is stripped from responses.** OAuth refresh - tokens are never returned by the API, so OAuth-backed connectors land - on the target without them. Operator must re-authorise on target. + ``connector_metadata``, so the SDK has nothing to reconstruct. + +2. **OAuth credentials can't be cloned.** OAuth refresh tokens are never + returned by the API, and a token can only be minted by completing the + OAuth flow as the target user — which only happens in the UI. + +Neither can be created from source. Both are skipped, their ids recorded +so dependent workflows cascade-skip (avoiding pipelines that fail every +run). The recovery path is the same for both: provision the connector on +the target with the *same name*, then re-run — the adopt-first lookup +picks it up and wires the dependent workflow endpoints. """ from __future__ import annotations @@ -83,31 +86,6 @@ def _clone_one( result.errors.append(f"GET source detail {name}: {e}") return - metadata = src.get("connector_metadata") or {} - if not metadata: - logger.info( - "skipping connector '%s' (src=%s, catalog=%s) — no source metadata", - name, - src_id, - src.get("connector_id"), - ) - with lock: - result.skipped += 1 - return - - if _has_oauth_tokens(metadata): - logger.warning( - "skipping connector '%s' (src=%s, catalog=%s) — OAuth-backed; " - "re-authorise on target after the clone, then re-run to wire " - "dependent workflow endpoints.", - name, - src_id, - src.get("connector_id"), - ) - with lock: - result.skipped += 1 - return - try: existing = self.ctx.target.list_connectors(name=name) except Exception as e: @@ -117,6 +95,10 @@ def _clone_one( result.errors.append(f"GET {name}: {e}") return + # Adopt a same-name target connector before anything else. This is the + # only way OAuth / redacted-metadata connectors come across: the + # operator provisions one on the target (where OAuth can complete), and + # a re-run adopts it and wires dependent workflow endpoints. if existing: tgt = existing[0] if self.ctx.options.on_name_conflict == "abort": @@ -131,30 +113,10 @@ def _clone_one( src_id, tgt["id"], ) - elif self.ctx.options.dry_run: - with lock: - result.created += 1 - self.ctx.remap.record_planned("connector", src_id) - logger.info("[dry-run] would create connector '%s' src=%s", name, src_id) - return else: - payload = build_post_payload(src, self._writable) - try: - tgt = self.ctx.target.create_connector(payload) - except Exception as e: - logger.exception("Failed to create connector %s: %s", name, e) - with lock: - result.failed += 1 - result.errors.append(f"create {name}: {e}") + tgt = self._recreate_or_skip(src, name, src_id, result, lock) + if tgt is None: return - with lock: - result.created += 1 - logger.info( - "created connector '%s' src=%s -> tgt=%s", - name, - src_id, - tgt["id"], - ) with lock: self.ctx.remap.record("connector", src_id, tgt["id"]) @@ -162,3 +124,84 @@ def _clone_one( self.apply_share( src=src, tgt_id=tgt["id"], label=name, result=result, lock=lock ) + + def _recreate_or_skip( + self, + src: dict[str, Any], + name: str, + src_id: str, + result: PhaseResult, + lock: threading.Lock, + ) -> dict[str, Any] | None: + """Recreate a connector from source, or skip when it can't be. + + Returns the target connector dict on create, or ``None`` when the + connector is skipped (no metadata to reconstruct, or OAuth credentials + that the API can't clone). Skipped ids are recorded so dependent + workflows cascade-skip. + """ + metadata = src.get("connector_metadata") or {} + if not metadata: + logger.info( + "skipping connector '%s' (src=%s, catalog=%s) — metadata not " + "exposed by the API (auto-provisioned, e.g. Cloud Storage); " + "provision it on the target with the same name and re-run to adopt", + name, + src_id, + src.get("connector_id"), + ) + with lock: + result.skipped += 1 + self.ctx.skipped_connector_ids.add(src_id) + result.warnings.append( + f"connector '{name}' skipped — metadata not exposed by the API " + "(auto-provisioned); provision it on target with the same name " + "and re-run to adopt" + ) + return None + + if _has_oauth_tokens(metadata): + logger.warning( + "skipping connector '%s' (src=%s, catalog=%s) — OAuth-backed; " + "credentials can't be cloned. Recreate + authorise it on the " + "target with the same name, then re-run to adopt and wire " + "dependent workflows.", + name, + src_id, + src.get("connector_id"), + ) + with lock: + result.skipped += 1 + self.ctx.skipped_connector_ids.add(src_id) + result.warnings.append( + f"connector '{name}' skipped — OAuth-backed, credentials can't " + "be cloned; recreate + authorise on target with the same name, " + "then re-run to adopt" + ) + return None + + if self.ctx.options.dry_run: + with lock: + result.created += 1 + self.ctx.remap.record_planned("connector", src_id) + logger.info("[dry-run] would create connector '%s' src=%s", name, src_id) + return None + + payload = build_post_payload(src, self._writable) + try: + tgt = self.ctx.target.create_connector(payload) + except Exception as e: + logger.exception("Failed to create connector %s: %s", name, e) + with lock: + result.failed += 1 + result.errors.append(f"create {name}: {e}") + return None + with lock: + result.created += 1 + logger.info( + "created connector '%s' src=%s -> tgt=%s", + name, + src_id, + tgt["id"], + ) + return tgt diff --git a/src/unstract/clone/phases/custom_tool.py b/src/unstract/clone/phases/custom_tool.py index 84d3f6d..10e2312 100644 --- a/src/unstract/clone/phases/custom_tool.py +++ b/src/unstract/clone/phases/custom_tool.py @@ -300,9 +300,7 @@ def _remap_prompts( if tgt_pid: self.ctx.remap.record("prompt", sp["prompt_id"], tgt_pid) - def _record_planned_prompts( - self, src_tool_id: str, lock: threading.Lock - ) -> None: + def _record_planned_prompts(self, src_tool_id: str, lock: threading.Lock) -> None: """Dry-run: record a planned prompt remap per source prompt so prompt-scoped phases can resolve their FK and plan-count. """ @@ -507,6 +505,11 @@ def _register_frictionless_skip( src_regs = [] with lock: result.skipped += 1 + result.warnings.append( + f"tool '{tool_name}' skipped — default profile references adapters " + f"not available to clone ({', '.join(missing_adapters)}); wire " + "equivalents on target and re-run" + ) for reg in src_regs: reg_id = reg.get("prompt_registry_id") if reg_id: diff --git a/src/unstract/clone/phases/workflow.py b/src/unstract/clone/phases/workflow.py index e8b34b7..8f02e11 100644 --- a/src/unstract/clone/phases/workflow.py +++ b/src/unstract/clone/phases/workflow.py @@ -29,6 +29,14 @@ WORKFLOW_PATH = "workflow/" +def _endpoint_connector_id(endpoint: dict[str, Any]) -> str | None: + """``connector_instance`` is a nested dict on the endpoint listing.""" + ci = endpoint.get("connector_instance") + if isinstance(ci, dict): + return ci.get("id") + return ci if isinstance(ci, str) else None + + class WorkflowPhase(Phase): name = "workflow" share_path_template = "workflow/{id}/share/" @@ -53,6 +61,7 @@ def run(self, report: CloneReport) -> PhaseResult: # Built once so per-workflow cascade-skip checks stay O(1). self._wf_to_src_tool_id = self._collect_wf_tool_map(result) + self._wf_to_connector_ids = self._collect_wf_connector_map() logger.info("Found %d workflow(s) in source org", len(src_workflows)) self.parallel_map( @@ -84,22 +93,67 @@ def _collect_wf_tool_map(self, result: PhaseResult) -> dict[str, str]: mapping[wf_id] = tool_id return mapping + def _collect_wf_connector_map(self) -> dict[str, set[str]]: + """Map source workflow_id to the connector ids its endpoints use, + from a single bulk endpoint listing. Only built when connectors were + skipped, so dependent workflows can cascade-skip. + """ + if not self.ctx.skipped_connector_ids: + return {} + try: + endpoints = self.ctx.source.list_workflow_endpoints() + except Exception as e: + logger.warning( + "workflow phase: failed to list source endpoints for connector " + "cascade-skip (%s); proceeding without cascade", + e, + ) + return {} + mapping: dict[str, set[str]] = {} + for ep in endpoints: + wf_id = ep.get("workflow") + conn_id = _endpoint_connector_id(ep) + if wf_id and conn_id: + mapping.setdefault(wf_id, set()).add(conn_id) + return mapping + def _clone_one( self, src: dict[str, Any], result: PhaseResult, lock: threading.Lock ) -> None: name = src["workflow_name"] src_id = src["id"] + # Collect every blocking dependency before skipping, so a workflow + # held up by both a skipped tool and a skipped connector reports both + # reasons in one pass rather than one per re-run. + skip_reasons: list[str] = [] src_tool_id = self._wf_to_src_tool_id.get(src_id) if src_tool_id and src_tool_id in self.ctx.skipped_custom_tool_registry_ids: + skip_reasons.append( + "its tool was skipped (frictionless adapter); wire equivalents " + "on target and re-run" + ) + skipped_conns = ( + self._wf_to_connector_ids.get(src_id, set()) + & self.ctx.skipped_connector_ids + ) + if skipped_conns: + skip_reasons.append( + "depends on un-clonable connector(s) " + f"({', '.join(sorted(skipped_conns))}); provision them on target " + "with the same name and re-run" + ) + if skip_reasons: logger.warning( - "skipping workflow '%s' src=%s — its tool was skipped in " - "custom_tool phase (frictionless adapter dependence)", + "skipping workflow '%s' src=%s — %s", name, src_id, + "; ".join(skip_reasons), ) with lock: result.skipped += 1 + for reason in skip_reasons: + result.warnings.append(f"workflow '{name}' skipped — {reason}") return try: diff --git a/tests/clone/test_connector_phase.py b/tests/clone/test_connector_phase.py index 78de5f2..5fe3abb 100644 --- a/tests/clone/test_connector_phase.py +++ b/tests/clone/test_connector_phase.py @@ -116,12 +116,14 @@ def test_redacted_metadata_connector_skipped(): assert result.created == 0 assert tgt.posts == [] assert ctx.remap.resolve("connector", "src-ucs") is None + # Recorded for downstream cascade-skip + surfaced in the report. + assert ctx.skipped_connector_ids == {"src-ucs"} + assert any("User Storage" in w for w in result.warnings) -def test_oauth_connector_skipped_before_post(): - """OAuth-backed connectors (metadata carries access_token/refresh_token) - would fail target POST with OAuthTimeOut — skip ahead of POST so the - operator re-authorises post-clone. +def test_oauth_connector_skipped_when_no_target_match(): + """OAuth-backed connectors can't be recreated (credentials can't be cloned) + and aren't on the target yet — skip, record for cascade, warn. """ oauth = _src("src-gdrive", "Unstract's google drive") oauth["connector_metadata"] = { @@ -141,6 +143,41 @@ def test_oauth_connector_skipped_before_post(): assert result.failed == 0 assert tgt.posts == [] assert ctx.remap.resolve("connector", "src-gdrive") is None + assert ctx.skipped_connector_ids == {"src-gdrive"} + assert any("OAuth" in w for w in result.warnings) + + +def test_oauth_connector_adopted_when_target_exists(): + """The recovery path: operator provisioned a same-name connector on the + target (where OAuth completes), so a re-run adopts it instead of skipping — + populating the remap so dependent endpoints wire. + """ + oauth = _src("src-gdrive", "Unstract's google drive") + oauth["connector_metadata"] = { + "provider": "google-oauth2", + "access_token": "ya29.src-access", + "refresh_token": "1//src-refresh", + } + src = FakeClient([oauth]) + tgt = FakeClient( + [ + { + "id": "tgt-gdrive", + "connector_id": "gdrive|abc", + "connector_name": "Unstract's google drive", + "connector_type": "INPUT", + "connector_metadata": {"refresh_token": "1//tgt-refresh"}, + } + ] + ) + ctx = _ctx(src, tgt, on_name_conflict="adopt") + + result = ConnectorPhase(ctx).run(CloneReport()) + + assert result.adopted == 1 + assert result.skipped == 0 + assert ctx.skipped_connector_ids == set() + assert ctx.remap.resolve("connector", "src-gdrive") == "tgt-gdrive" def test_idempotency_zero_creates_on_rerun(): diff --git a/tests/clone/test_workflow_phase.py b/tests/clone/test_workflow_phase.py index e12bb3a..51a1e5f 100644 --- a/tests/clone/test_workflow_phase.py +++ b/tests/clone/test_workflow_phase.py @@ -21,7 +21,6 @@ from unstract.clone.phases.workflow import WorkflowPhase from unstract.clone.report import CloneReport - WORKFLOW_POST_SCHEMA = frozenset( { "workflow_name", @@ -42,6 +41,7 @@ def __init__(self, workflows: list[dict] | None = None): self.workflows: list[dict] = list(workflows or []) self.posts: list[dict] = [] self.tool_instances: list[dict] = [] + self.endpoints: list[dict] = [] self._next_id = 1 def get_post_schema(self, entity_path: str) -> frozenset[str]: @@ -64,6 +64,11 @@ def list_tool_instances(self, *, workflow_id: str | None = None) -> list[dict]: return list(self.tool_instances) return [ti for ti in self.tool_instances if ti.get("workflow") == workflow_id] + def list_workflow_endpoints(self, *, workflow_id: str | None = None) -> list[dict]: + if workflow_id is None: + return list(self.endpoints) + return [e for e in self.endpoints if e.get("workflow") == workflow_id] + def create_workflow(self, payload: dict) -> dict: new = dict(payload) new["id"] = f"tgt-{self._next_id:08d}-0000-0000-0000-000000000000" @@ -132,9 +137,7 @@ def test_happy_path_creates_workflow_and_remaps_connector_uuids(): def test_idempotent_rerun_adopts_existing_workflow(): src = FakeClient([_src("wf-src-1", "Invoice ETL")]) - tgt = FakeClient( - [{"id": "wf-tgt-pre", "workflow_name": "Invoice ETL"}] - ) + tgt = FakeClient([{"id": "wf-tgt-pre", "workflow_name": "Invoice ETL"}]) ctx = _ctx(src, tgt, on_name_conflict="adopt") result = WorkflowPhase(ctx).run(CloneReport()) @@ -163,9 +166,7 @@ def test_dry_run_predicts_create_without_writing(): def test_abort_on_name_conflict_raises(): src = FakeClient([_src("wf-src-1", "Invoice ETL")]) - tgt = FakeClient( - [{"id": "wf-tgt-pre", "workflow_name": "Invoice ETL"}] - ) + tgt = FakeClient([{"id": "wf-tgt-pre", "workflow_name": "Invoice ETL"}]) ctx = _ctx(src, tgt, on_name_conflict="abort") with pytest.raises(NameConflictError): @@ -194,3 +195,66 @@ def test_cascade_skip_when_workflow_tool_was_skipped(): assert [p["workflow_name"] for p in tgt.posts] == ["OK WF"] assert ctx.remap.resolve("workflow", "wf-skipped") is None assert ctx.remap.resolve("workflow", "wf-ok") is not None + assert any("its tool was skipped" in w for w in result.warnings) + + +def test_dual_skip_reports_both_tool_and_connector_reasons(): + """A workflow blocked by both a skipped tool and a skipped connector + surfaces both reasons in one pass, not one per re-run. + """ + skipped_reg = "skipped-registry-id" + skipped_conn = "oauth-conn-id" + src = FakeClient([_src("wf-both", "Blocked WF")]) + src.tool_instances = [{"workflow": "wf-both", "tool_id": skipped_reg}] + src.endpoints = [ + { + "workflow": "wf-both", + "endpoint_type": "SOURCE", + "connector_instance": {"id": skipped_conn}, + } + ] + tgt = FakeClient() + ctx = _ctx(src, tgt) + ctx.skipped_custom_tool_registry_ids.add(skipped_reg) + ctx.skipped_connector_ids.add(skipped_conn) + + result = WorkflowPhase(ctx).run(CloneReport()) + + assert result.skipped == 1 + assert tgt.posts == [] + assert ctx.remap.resolve("workflow", "wf-both") is None + assert any("its tool was skipped" in w for w in result.warnings) + assert any("un-clonable connector" in w for w in result.warnings) + + +def test_cascade_skip_when_endpoint_connector_skipped(): + """Workflow whose endpoint uses an un-clonable (OAuth/redacted) connector + must not land on target — else its pipelines fail every run. Downstream + pipeline / api_deployment cascade off the missing workflow remap. + """ + skipped_conn = "oauth-conn-id" + src = FakeClient([_src("wf-oauth", "Gdrive ETL"), _src("wf-ok", "OK WF")]) + src.endpoints = [ + { + "workflow": "wf-oauth", + "endpoint_type": "SOURCE", + "connector_instance": {"id": skipped_conn}, + }, + { + "workflow": "wf-ok", + "endpoint_type": "SOURCE", + "connector_instance": {"id": "good-conn"}, + }, + ] + tgt = FakeClient() + ctx = _ctx(src, tgt) + ctx.skipped_connector_ids.add(skipped_conn) + + result = WorkflowPhase(ctx).run(CloneReport()) + + assert result.created == 1 + assert result.skipped == 1 + assert [p["workflow_name"] for p in tgt.posts] == ["OK WF"] + assert ctx.remap.resolve("workflow", "wf-oauth") is None + assert ctx.remap.resolve("workflow", "wf-ok") is not None + assert any("un-clonable" in w for w in result.warnings)