Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/version_history.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Version History

#### 0.20.0
* Added policy management (`get_policy`/`set_policy`) for NGTS (Strata Cloud Manager), operating on the CIT-only zone

#### 0.19.0
* Added support for NGTS (Strata Cloud Manager)

#### 0.18.0
* Added support for CyberArk Certificate Manager, Self-Hosted 25.1
* Upgraded dependencies
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
long_description = f.read()

setup(name='vcert',
version='0.19.0',
version='0.20.0',
url="https://github.com/Venafi/vcert-python",
packages=['vcert', 'vcert.parser', 'vcert.policy'],
install_requires=['requests==2.32.4', 'python-dateutil==2.8.2', 'six==1.17.0',
Expand Down
124 changes: 123 additions & 1 deletion tests/test_local_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN
from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection,
NGTSConnection, logger)
from vcert.connection_cloud import URLS
from vcert.connection_ngts import (_parse_ngts_zone, DEFAULT_API_URL, DEFAULT_TOKEN_URL,
TRUSTED_TOKEN_HOST_SUFFIX)
from vcert.errors import ClientBadData, ServerUnexptedBehavior
from vcert.errors import ClientBadData, ServerUnexptedBehavior, VenafiError
from vcert.pem import parse_pem, Certificate
from vcert.policy.pm_cloud import CertificateAuthorityDetails, CertificateAuthorityInfo
from vcert.policy.policy_spec import DEFAULT_CA, Policy, PolicySpecification

pkcs12_enc_cert = """-----BEGIN CERTIFICATE-----
MIICljCCAX6gAwIBAgIRAO8Qp6LUsgVDQrxHXX1LUV4wDQYJKoZIhvcNAQENBQAw
Expand Down Expand Up @@ -505,3 +508,122 @@ def test_ngts_get_sends_bearer_header(self):
conn._get("v1/certificateissuingtemplates")
_, kwargs = get.call_args
self.assertEqual(kwargs['headers']['Authorization'], 'Bearer pre.issued.token')

# -- NGTS policy management (offline) -----------------------------------------------------
#
# NGTS reuses Cloud's CIT/CA/policy-spec helpers (validate_policy_spec, build_cit_request,
# build_policy_spec - already covered by tests/test_pm.py) and only differs in the
# NGTS-specific delta: CIT-only zone, the global certificateissuingtemplates endpoint, and
# NO Application/owner layer. These tests pin that delta. We mock at the helper-method
# boundary so we exercise the NGTS orchestration without a live backend.

@staticmethod
def _ngts_policy_spec():
ps = PolicySpecification()
ps.policy = Policy()
ps.policy.certificate_authority = "DIGICERT\\acct-key\\Product"
return ps

def test_ngts_set_policy_creates_cit_on_global_endpoint(self):
# No existing CIT -> POST to the global issuing-templates endpoint, named by the zone.
conn = self._ngts_conn(access_token='t', token_url=None)
with mock.patch.object(conn, '_get_ca_details', return_value=CertificateAuthorityDetails('po-1', 'org-1')), \
mock.patch('vcert.connection_ngts.validate_policy_spec'), \
mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \
mock.patch.object(conn, '_get_cit', return_value=None), \
mock.patch.object(conn, '_post', return_value=(201, {})) as post, \
mock.patch.object(conn, '_put') as put:
conn.set_policy("my-template", self._ngts_policy_spec())

post.assert_called_once()
url, = post.call_args[0][:1]
self.assertEqual(url, URLS.ISSUING_TEMPLATES)
self.assertEqual(post.call_args[0][1]['name'], "my-template")
put.assert_not_called()

def test_ngts_set_policy_updates_existing_cit(self):
# Existing CIT -> PUT to the per-id update endpoint, no create.
conn = self._ngts_conn(access_token='t', token_url=None)
with mock.patch.object(conn, '_get_ca_details', return_value=CertificateAuthorityDetails('po-1', 'org-1')), \
mock.patch('vcert.connection_ngts.validate_policy_spec'), \
mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \
mock.patch.object(conn, '_get_cit', return_value={'id': 'cit-123', 'name': 'my-template'}), \
mock.patch.object(conn, '_put', return_value=(200, {})) as put, \
mock.patch.object(conn, '_post') as post:
conn.set_policy("my-template", self._ngts_policy_spec())

put.assert_called_once()
self.assertEqual(put.call_args[0][0], URLS.ISSUING_TEMPLATES_UPDATE.format('cit-123'))
post.assert_not_called()

def test_ngts_set_policy_never_touches_application_endpoints(self):
# The Application/owner layer (Cloud-only) must never be exercised on NGTS.
conn = self._ngts_conn(access_token='t', token_url=None)
with mock.patch.object(conn, '_get_ca_details', return_value=CertificateAuthorityDetails('po-1', 'org-1')), \
mock.patch('vcert.connection_ngts.validate_policy_spec'), \
mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \
mock.patch.object(conn, '_get_cit', return_value=None), \
mock.patch.object(conn, '_post', return_value=(201, {})), \
mock.patch.object(conn, '_put'), \
mock.patch.object(conn, '_get_user_details') as user_details, \
mock.patch.object(conn, '_get_app_details_by_name') as app_details, \
mock.patch.object(conn, 'resolve_owners') as resolve_owners:
conn.set_policy("my-template", self._ngts_policy_spec())

user_details.assert_not_called()
app_details.assert_not_called()
resolve_owners.assert_not_called()

def test_ngts_set_policy_defaults_ca_when_absent(self):
# Parity with Go NGTS: an unset certificate_authority defaults to DEFAULT_CA.
conn = self._ngts_conn(access_token='t', token_url=None)
ps = PolicySpecification()
ps.policy = Policy() # certificate_authority left as None
with mock.patch.object(conn, '_get_ca_details',
return_value=CertificateAuthorityDetails('po-1', 'org-1')) as ca_details, \
mock.patch('vcert.connection_ngts.validate_policy_spec'), \
mock.patch('vcert.connection_ngts.build_cit_request', return_value={}), \
mock.patch.object(conn, '_get_cit', return_value=None), \
mock.patch.object(conn, '_post', return_value=(201, {})), \
mock.patch.object(conn, '_put'):
conn.set_policy("my-template", ps)

ca_details.assert_called_once_with(DEFAULT_CA)
self.assertEqual(ps.policy.certificate_authority, DEFAULT_CA)

def test_ngts_get_policy_builds_spec_without_owners(self):
# get_policy builds a real PolicySpecification from the CIT + CA info and, unlike Cloud,
# never resolves Application owners (none exist on NGTS) -> users/owners stay empty.
conn = self._ngts_conn(access_token='t', token_url=None)
cit_dict = {
'id': 'cit-123',
'name': 'my-template',
'certificateAuthority': 'DIGICERT',
'certificateAuthorityAccountId': 'acct-1',
'certificateAuthorityProductOptionId': 'po-1',
'subjectCNRegexes': ['.*\\.example\\.com'],
'sanRegexes': ['.*\\.example\\.com'],
'keyReuse': False,
'validityPeriod': 'P90D',
'csrUploadAllowed': True,
'keyGeneratedByVenafiAllowed': True,
'keyTypes': [{'keyType': 'RSA', 'keyLengths': [2048]}],
}
info = CertificateAuthorityInfo('DIGICERT', 'acct-key', 'Product')
with mock.patch.object(conn, '_get_cit', return_value=cit_dict), \
mock.patch.object(conn, '_get_ca_info', return_value=info), \
mock.patch.object(conn, 'resolve_cloud_owners_names') as resolve_owners:
ps = conn.get_policy("my-template")

resolve_owners.assert_not_called()
self.assertIsInstance(ps, PolicySpecification)
self.assertEqual(ps.policy.max_valid_days, 90)
self.assertEqual(ps.policy.certificate_authority, "DIGICERT\\acct-key\\Product")
self.assertEqual(ps.users, [])
self.assertEqual(ps.owners, [])

def test_ngts_get_policy_missing_cit_raises(self):
conn = self._ngts_conn(access_token='t', token_url=None)
with mock.patch.object(conn, '_get_cit', return_value=None):
with self.assertRaises(VenafiError):
conn.get_policy("does-not-exist")
31 changes: 31 additions & 0 deletions tests/test_ngts.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from test_utils import random_word, enroll, renew, renew_by_thumbprint
from vcert import NGTSConnection, KeyType, logger
from vcert.common import RetireRequest
from vcert.policy.policy_spec import Policy, PolicySpecification

log = logger.get_child("test-ngts")

Expand Down Expand Up @@ -82,6 +83,36 @@ def test_ngts_read_zone_invalid_zone(self):
with self.assertRaises(Exception):
self.ngts_conn.read_zone_conf(f"non-existent-cit-{random_word(8)}")

def test_ngts_get_policy(self):
# Read-only: get_policy on the configured zone. NGTS has no Application layer, so users
# are always empty (parity with Go NGTS).
ps = self.ngts_conn.get_policy(self.ngts_zone)
self.assertIsNotNone(ps.policy)
self.assertTrue(ps.policy.certificate_authority)
self.assertEqual(ps.users, [])

def test_ngts_set_get_policy_roundtrip(self):
# set_policy mutates a CIT by alias, so use a THROWAWAY alias - never self.ngts_zone,
# which the other tests depend on. The CA is read from the existing zone so it is
# guaranteed valid for this tenant.
existing = self.ngts_conn.get_policy(self.ngts_zone)
ca = existing.policy.certificate_authority

ps = PolicySpecification()
ps.policy = Policy(
domains=["venafi.example"],
max_valid_days=90,
cert_auth=ca,
)
throwaway_zone = f"vcert-python-pmtest-{random_word(8)}"
self.ngts_conn.set_policy(throwaway_zone, ps)

result = self.ngts_conn.get_policy(throwaway_zone)
self.assertEqual(result.policy.certificate_authority, ca)
self.assertEqual(result.policy.max_valid_days, 90)
self.assertIn("venafi.example", result.policy.domains)
self.assertEqual(result.users, [])


if __name__ == '__main__':
unittest.main()
66 changes: 63 additions & 3 deletions vcert/connection_ngts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
CertificateRenewError, VenafiError)
from .http_status import HTTPStatus
from .logger import get_child
from .policy.pm_cloud import build_policy_spec, build_cit_request, validate_policy_spec
from .policy.policy_spec import DEFAULT_CA

# OAuth2 access tokens issued by Strata Cloud Manager live ~15 minutes. Refresh a little
# ahead of expiry so in-flight calls never race the boundary (mirrors Go's
Expand Down Expand Up @@ -456,13 +458,71 @@ def read_zone_conf(self, zone):
)
return z

# -- Out of scope for NGTS ----------------------------------------------------------------
# -- Policy management (deltas vs Cloud) --------------------------------------------------

def get_policy(self, zone):
raise NotImplementedError
"""
Build a PolicySpecification from the NGTS Certificate Issuing Template (CIT) named by
``zone``. Mirrors Go's NGTS ``GetPolicy`` and Cloud's ``_get_policy`` minus the
Application-owner resolution: NGTS has no Application layer, so there are no owners/users
to resolve and ``users``/``owners`` stay empty (parity with Go NGTS).

:param str zone: the CIT alias (NGTS zones are a CIT alias only - no Application\\CIT split)
:rtype: PolicySpecification
"""
cit_data = self._get_cit_or_fail(zone)
cit = self._parse_policy_response_to_object(cit_data)

info = self._get_ca_info(cit.cert_authority, cit.cert_authority_account_id,
cit.cert_authority_product_option_id)
if not info:
raise VenafiError("Certificate Authority info not found")

ps = build_policy_spec(cit, info, subject_cn_to_str=True)
return ps

def set_policy(self, zone, policy_spec):
raise NotImplementedError
"""
Create or update the NGTS Certificate Issuing Template (CIT) named by ``zone`` from
``policy_spec``. Mirrors Go's NGTS ``SetPolicy`` and Cloud's ``set_policy`` with the
Application create/link and owner handling removed: NGTS has no Application layer, so the
CIT is created/updated directly on the global issuing-template endpoint and
``policy_spec.users`` is ignored (parity with Go NGTS).

:param str zone: the CIT alias (NGTS zones are a CIT alias only - no Application\\CIT split)
:param PolicySpecification policy_spec:
"""
validate_policy_spec(policy_spec)
cit_alias = _parse_ngts_zone(zone)

if not policy_spec.policy:
raise VenafiError("Policy is required")
if not policy_spec.policy.certificate_authority:
# Default the CA exactly as Go's NGTS SetPolicy does when none is supplied.
policy_spec.policy.certificate_authority = DEFAULT_CA

ca_details = self._get_ca_details(policy_spec.policy.certificate_authority)
if not ca_details:
raise VenafiError(f"CA [{policy_spec.policy.certificate_authority}] not found in "
f"Strata Cloud Manager")

request = build_cit_request(policy_spec, ca_details)
request['name'] = cit_alias

cit_data = self._get_cit(cit_alias)
if cit_data:
# Issuing Template exists. Update
status, _ = self._put(URLS.ISSUING_TEMPLATES_UPDATE.format(cit_data['id']), request)
if status != HTTPStatus.OK:
raise VenafiError(f"Failed to update issuing template [{cit_data['id']}] for zone [{zone}]")
else:
# Issuing Template does not exist. Create one
status, _ = self._post(URLS.ISSUING_TEMPLATES, request)
if status != HTTPStatus.CREATED:
raise VenafiError(f"Failed to create issuing template for zone [{zone}]")
return

# -- Out of scope for NGTS ----------------------------------------------------------------

def get_version(self):
raise NotImplementedError