From 73146019630f1de24afe87465d58aff546d1428a Mon Sep 17 00:00:00 2001 From: Tomasz Swierszcz Date: Mon, 22 Jun 2026 18:11:32 +0200 Subject: [PATCH] feat(ngts): add policy management (get_policy/set_policy) - VC-54745 Implement get_policy/set_policy for NGTS by reusing Cloud's CIT/CA/policy-spec helpers (build_policy_spec, build_cit_request, validate_policy_spec) while skipping the Application/owner layer NGTS has no concept of: CIT-only zone, global certificateissuingtemplates endpoint, empty users/owners (parity with Go NGTS). - Offline tests covering create/update, default-CA, no-Application-calls, and get_policy spec build (tests/test_local_methods.py) - Live get_policy + set->get round-trip tests (tests/test_ngts.py) - Bump version to 0.20.0 --- docs/version_history.md | 6 ++ setup.py | 2 +- tests/test_local_methods.py | 124 +++++++++++++++++++++++++++++++++++- tests/test_ngts.py | 31 +++++++++ vcert/connection_ngts.py | 66 ++++++++++++++++++- 5 files changed, 224 insertions(+), 5 deletions(-) diff --git a/docs/version_history.md b/docs/version_history.md index cd161f2..38b8c32 100644 --- a/docs/version_history.md +++ b/docs/version_history.md @@ -2,6 +2,12 @@ ## Version History +#### 0.20.0 +* Added policy management (`get_policy`/`set_policy`) for NGTS (Strata Cloud Manager), operating on the CIT-only zone + +#### 0.19.0 +* Added support for NGTS (Strata Cloud Manager) + #### 0.18.0 * Added support for CyberArk Certificate Manager, Self-Hosted 25.1 * Upgraded dependencies diff --git a/setup.py b/setup.py index 3b5d4c0..75003df 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ long_description = f.read() setup(name='vcert', - version='0.19.0', + version='0.20.0', url="https://github.com/Venafi/vcert-python", packages=['vcert', 'vcert.parser', 'vcert.policy'], install_requires=['requests==2.32.4', 'python-dateutil==2.8.2', 'six==1.17.0', diff --git a/tests/test_local_methods.py b/tests/test_local_methods.py index a2d41b6..80ff14a 100644 --- a/tests/test_local_methods.py +++ b/tests/test_local_methods.py @@ -25,10 +25,13 @@ from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection, NGTSConnection, logger) +from vcert.connection_cloud import URLS from vcert.connection_ngts import (_parse_ngts_zone, DEFAULT_API_URL, DEFAULT_TOKEN_URL, TRUSTED_TOKEN_HOST_SUFFIX) -from vcert.errors import ClientBadData, ServerUnexptedBehavior +from vcert.errors import ClientBadData, ServerUnexptedBehavior, VenafiError from vcert.pem import parse_pem, Certificate +from vcert.policy.pm_cloud import CertificateAuthorityDetails, CertificateAuthorityInfo +from vcert.policy.policy_spec import DEFAULT_CA, Policy, PolicySpecification pkcs12_enc_cert = """-----BEGIN CERTIFICATE----- MIICljCCAX6gAwIBAgIRAO8Qp6LUsgVDQrxHXX1LUV4wDQYJKoZIhvcNAQENBQAw @@ -505,3 +508,122 @@ def test_ngts_get_sends_bearer_header(self): conn._get("v1/certificateissuingtemplates") _, kwargs = get.call_args self.assertEqual(kwargs['headers']['Authorization'], 'Bearer pre.issued.token') + + # -- NGTS policy management (offline) ----------------------------------------------------- + # + # NGTS reuses Cloud's CIT/CA/policy-spec helpers (validate_policy_spec, build_cit_request, + # build_policy_spec - already covered by tests/test_pm.py) and only differs in the + # NGTS-specific delta: CIT-only zone, the global certificateissuingtemplates endpoint, and + # NO Application/owner layer. These tests pin that delta. We mock at the helper-method + # boundary so we exercise the NGTS orchestration without a live backend. + + @staticmethod + def _ngts_policy_spec(): + ps = PolicySpecification() + ps.policy = Policy() + ps.policy.certificate_authority = "DIGICERT\\acct-key\\Product" + return ps + + def test_ngts_set_policy_creates_cit_on_global_endpoint(self): + # No existing CIT -> POST to the global issuing-templates endpoint, named by the zone. + conn = self._ngts_conn(access_token='t', token_url=None) + with mock.patch.object(conn, '_get_ca_details', return_value=CertificateAuthorityDetails('po-1', 'org-1')), \ + mock.patch('vcert.connection_ngts.validate_policy_spec'), \ + mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \ + mock.patch.object(conn, '_get_cit', return_value=None), \ + mock.patch.object(conn, '_post', return_value=(201, {})) as post, \ + mock.patch.object(conn, '_put') as put: + conn.set_policy("my-template", self._ngts_policy_spec()) + + post.assert_called_once() + url, = post.call_args[0][:1] + self.assertEqual(url, URLS.ISSUING_TEMPLATES) + self.assertEqual(post.call_args[0][1]['name'], "my-template") + put.assert_not_called() + + def test_ngts_set_policy_updates_existing_cit(self): + # Existing CIT -> PUT to the per-id update endpoint, no create. + conn = self._ngts_conn(access_token='t', token_url=None) + with mock.patch.object(conn, '_get_ca_details', return_value=CertificateAuthorityDetails('po-1', 'org-1')), \ + mock.patch('vcert.connection_ngts.validate_policy_spec'), \ + mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \ + mock.patch.object(conn, '_get_cit', return_value={'id': 'cit-123', 'name': 'my-template'}), \ + mock.patch.object(conn, '_put', return_value=(200, {})) as put, \ + mock.patch.object(conn, '_post') as post: + conn.set_policy("my-template", self._ngts_policy_spec()) + + put.assert_called_once() + self.assertEqual(put.call_args[0][0], URLS.ISSUING_TEMPLATES_UPDATE.format('cit-123')) + post.assert_not_called() + + def test_ngts_set_policy_never_touches_application_endpoints(self): + # The Application/owner layer (Cloud-only) must never be exercised on NGTS. + conn = self._ngts_conn(access_token='t', token_url=None) + with mock.patch.object(conn, '_get_ca_details', return_value=CertificateAuthorityDetails('po-1', 'org-1')), \ + mock.patch('vcert.connection_ngts.validate_policy_spec'), \ + mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \ + mock.patch.object(conn, '_get_cit', return_value=None), \ + mock.patch.object(conn, '_post', return_value=(201, {})), \ + mock.patch.object(conn, '_put'), \ + mock.patch.object(conn, '_get_user_details') as user_details, \ + mock.patch.object(conn, '_get_app_details_by_name') as app_details, \ + mock.patch.object(conn, 'resolve_owners') as resolve_owners: + conn.set_policy("my-template", self._ngts_policy_spec()) + + user_details.assert_not_called() + app_details.assert_not_called() + resolve_owners.assert_not_called() + + def test_ngts_set_policy_defaults_ca_when_absent(self): + # Parity with Go NGTS: an unset certificate_authority defaults to DEFAULT_CA. + conn = self._ngts_conn(access_token='t', token_url=None) + ps = PolicySpecification() + ps.policy = Policy() # certificate_authority left as None + with mock.patch.object(conn, '_get_ca_details', + return_value=CertificateAuthorityDetails('po-1', 'org-1')) as ca_details, \ + mock.patch('vcert.connection_ngts.validate_policy_spec'), \ + mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \ + mock.patch.object(conn, '_get_cit', return_value=None), \ + mock.patch.object(conn, '_post', return_value=(201, {})), \ + mock.patch.object(conn, '_put'): + conn.set_policy("my-template", ps) + + ca_details.assert_called_once_with(DEFAULT_CA) + self.assertEqual(ps.policy.certificate_authority, DEFAULT_CA) + + def test_ngts_get_policy_builds_spec_without_owners(self): + # get_policy builds a real PolicySpecification from the CIT + CA info and, unlike Cloud, + # never resolves Application owners (none exist on NGTS) -> users/owners stay empty. + conn = self._ngts_conn(access_token='t', token_url=None) + cit_dict = { + 'id': 'cit-123', + 'name': 'my-template', + 'certificateAuthority': 'DIGICERT', + 'certificateAuthorityAccountId': 'acct-1', + 'certificateAuthorityProductOptionId': 'po-1', + 'subjectCNRegexes': ['.*\\.example\\.com'], + 'sanRegexes': ['.*\\.example\\.com'], + 'keyReuse': False, + 'validityPeriod': 'P90D', + 'csrUploadAllowed': True, + 'keyGeneratedByVenafiAllowed': True, + 'keyTypes': [{'keyType': 'RSA', 'keyLengths': [2048]}], + } + info = CertificateAuthorityInfo('DIGICERT', 'acct-key', 'Product') + with mock.patch.object(conn, '_get_cit', return_value=cit_dict), \ + mock.patch.object(conn, '_get_ca_info', return_value=info), \ + mock.patch.object(conn, 'resolve_cloud_owners_names') as resolve_owners: + ps = conn.get_policy("my-template") + + resolve_owners.assert_not_called() + self.assertIsInstance(ps, PolicySpecification) + self.assertEqual(ps.policy.max_valid_days, 90) + self.assertEqual(ps.policy.certificate_authority, "DIGICERT\\acct-key\\Product") + self.assertEqual(ps.users, []) + self.assertEqual(ps.owners, []) + + def test_ngts_get_policy_missing_cit_raises(self): + conn = self._ngts_conn(access_token='t', token_url=None) + with mock.patch.object(conn, '_get_cit', return_value=None): + with self.assertRaises(VenafiError): + conn.get_policy("does-not-exist") diff --git a/tests/test_ngts.py b/tests/test_ngts.py index b215e5c..2f03b07 100644 --- a/tests/test_ngts.py +++ b/tests/test_ngts.py @@ -29,6 +29,7 @@ from test_utils import random_word, enroll, renew, renew_by_thumbprint from vcert import NGTSConnection, KeyType, logger from vcert.common import RetireRequest +from vcert.policy.policy_spec import Policy, PolicySpecification log = logger.get_child("test-ngts") @@ -82,6 +83,36 @@ def test_ngts_read_zone_invalid_zone(self): with self.assertRaises(Exception): self.ngts_conn.read_zone_conf(f"non-existent-cit-{random_word(8)}") + def test_ngts_get_policy(self): + # Read-only: get_policy on the configured zone. NGTS has no Application layer, so users + # are always empty (parity with Go NGTS). + ps = self.ngts_conn.get_policy(self.ngts_zone) + self.assertIsNotNone(ps.policy) + self.assertTrue(ps.policy.certificate_authority) + self.assertEqual(ps.users, []) + + def test_ngts_set_get_policy_roundtrip(self): + # set_policy mutates a CIT by alias, so use a THROWAWAY alias - never self.ngts_zone, + # which the other tests depend on. The CA is read from the existing zone so it is + # guaranteed valid for this tenant. + existing = self.ngts_conn.get_policy(self.ngts_zone) + ca = existing.policy.certificate_authority + + ps = PolicySpecification() + ps.policy = Policy( + domains=["venafi.example"], + max_valid_days=90, + cert_auth=ca, + ) + throwaway_zone = f"vcert-python-pmtest-{random_word(8)}" + self.ngts_conn.set_policy(throwaway_zone, ps) + + result = self.ngts_conn.get_policy(throwaway_zone) + self.assertEqual(result.policy.certificate_authority, ca) + self.assertEqual(result.policy.max_valid_days, 90) + self.assertIn("venafi.example", result.policy.domains) + self.assertEqual(result.users, []) + if __name__ == '__main__': unittest.main() diff --git a/vcert/connection_ngts.py b/vcert/connection_ngts.py index 6e7baf9..cf99efb 100644 --- a/vcert/connection_ngts.py +++ b/vcert/connection_ngts.py @@ -26,6 +26,8 @@ CertificateRenewError, VenafiError) from .http_status import HTTPStatus from .logger import get_child +from .policy.pm_cloud import build_policy_spec, build_cit_request, validate_policy_spec +from .policy.policy_spec import DEFAULT_CA # OAuth2 access tokens issued by Strata Cloud Manager live ~15 minutes. Refresh a little # ahead of expiry so in-flight calls never race the boundary (mirrors Go's @@ -456,13 +458,71 @@ def read_zone_conf(self, zone): ) return z - # -- Out of scope for NGTS ---------------------------------------------------------------- + # -- Policy management (deltas vs Cloud) -------------------------------------------------- def get_policy(self, zone): - raise NotImplementedError + """ + Build a PolicySpecification from the NGTS Certificate Issuing Template (CIT) named by + ``zone``. Mirrors Go's NGTS ``GetPolicy`` and Cloud's ``_get_policy`` minus the + Application-owner resolution: NGTS has no Application layer, so there are no owners/users + to resolve and ``users``/``owners`` stay empty (parity with Go NGTS). + + :param str zone: the CIT alias (NGTS zones are a CIT alias only - no Application\\CIT split) + :rtype: PolicySpecification + """ + cit_data = self._get_cit_or_fail(zone) + cit = self._parse_policy_response_to_object(cit_data) + + info = self._get_ca_info(cit.cert_authority, cit.cert_authority_account_id, + cit.cert_authority_product_option_id) + if not info: + raise VenafiError("Certificate Authority info not found") + + ps = build_policy_spec(cit, info, subject_cn_to_str=True) + return ps def set_policy(self, zone, policy_spec): - raise NotImplementedError + """ + Create or update the NGTS Certificate Issuing Template (CIT) named by ``zone`` from + ``policy_spec``. Mirrors Go's NGTS ``SetPolicy`` and Cloud's ``set_policy`` with the + Application create/link and owner handling removed: NGTS has no Application layer, so the + CIT is created/updated directly on the global issuing-template endpoint and + ``policy_spec.users`` is ignored (parity with Go NGTS). + + :param str zone: the CIT alias (NGTS zones are a CIT alias only - no Application\\CIT split) + :param PolicySpecification policy_spec: + """ + validate_policy_spec(policy_spec) + cit_alias = _parse_ngts_zone(zone) + + if not policy_spec.policy: + raise VenafiError("Policy is required") + if not policy_spec.policy.certificate_authority: + # Default the CA exactly as Go's NGTS SetPolicy does when none is supplied. + policy_spec.policy.certificate_authority = DEFAULT_CA + + ca_details = self._get_ca_details(policy_spec.policy.certificate_authority) + if not ca_details: + raise VenafiError(f"CA [{policy_spec.policy.certificate_authority}] not found in " + f"Strata Cloud Manager") + + request = build_cit_request(policy_spec, ca_details) + request['name'] = cit_alias + + cit_data = self._get_cit(cit_alias) + if cit_data: + # Issuing Template exists. Update + status, _ = self._put(URLS.ISSUING_TEMPLATES_UPDATE.format(cit_data['id']), request) + if status != HTTPStatus.OK: + raise VenafiError(f"Failed to update issuing template [{cit_data['id']}] for zone [{zone}]") + else: + # Issuing Template does not exist. Create one + status, _ = self._post(URLS.ISSUING_TEMPLATES, request) + if status != HTTPStatus.CREATED: + raise VenafiError(f"Failed to create issuing template for zone [{zone}]") + return + + # -- Out of scope for NGTS ---------------------------------------------------------------- def get_version(self): raise NotImplementedError