-
Notifications
You must be signed in to change notification settings - Fork 43
fix: thread safety issues #614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import logging | ||
| import threading | ||
| import typing | ||
| from collections.abc import Awaitable, Mapping, Sequence | ||
| from dataclasses import dataclass | ||
|
|
@@ -86,6 +87,7 @@ def __init__( | |
| self.version = version | ||
| self.context = context or EvaluationContext() | ||
| self.hooks = hooks or [] | ||
| self._hooks_lock = threading.Lock() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
|
|
||
| @property | ||
| def provider(self) -> FeatureProvider: | ||
|
|
@@ -98,7 +100,10 @@ def get_metadata(self) -> ClientMetadata: | |
| return ClientMetadata(domain=self.domain) | ||
|
|
||
| def add_hooks(self, hooks: list[Hook]) -> None: | ||
| self.hooks = self.hooks + hooks | ||
| # Guards the read-concat-store against a lost update; this practically never races under the default 5ms GIL | ||
| # switch interval, but is essential under a no-GIL build. | ||
| with self._hooks_lock: | ||
| self.hooks = self.hooks + hooks | ||
|
|
||
| def get_boolean_value( | ||
| self, | ||
|
|
@@ -468,8 +473,9 @@ def _establish_hooks_and_provider( | |
|
|
||
| def _assert_provider_status( | ||
| self, | ||
| provider: FeatureProvider, | ||
| ) -> OpenFeatureError | None: | ||
| status = self.get_provider_status() | ||
| status = provider_registry.get_provider_status(provider) | ||
| if status == ProviderStatus.NOT_READY: | ||
| return ProviderNotReadyError() | ||
| if status == ProviderStatus.FATAL: | ||
|
|
@@ -589,7 +595,7 @@ async def evaluate_flag_details_async( | |
| ) | ||
|
|
||
| try: | ||
| if provider_err := self._assert_provider_status(): | ||
| if provider_err := self._assert_provider_status(provider): | ||
| error_hooks( | ||
| flag_type, | ||
| provider_err, | ||
|
|
@@ -765,7 +771,7 @@ def evaluate_flag_details( | |
| ) | ||
|
|
||
| try: | ||
| if provider_err := self._assert_provider_status(): | ||
| if provider_err := self._assert_provider_status(provider): | ||
| error_hooks( | ||
| flag_type, | ||
| provider_err, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import threading | ||
| import typing | ||
| from collections.abc import Mapping, MutableMapping, Sequence | ||
| from datetime import datetime | ||
|
|
@@ -24,6 +25,7 @@ | |
| ] | ||
|
|
||
| _hooks: list[Hook] = [] | ||
| _hooks_lock = threading.Lock() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you use an |
||
|
|
||
|
|
||
| # https://openfeature.dev/specification/sections/hooks/#requirement-461 | ||
|
|
@@ -151,14 +153,21 @@ def supports_flag_value_type(self, flag_type: FlagType) -> bool: | |
| return True | ||
|
|
||
|
|
||
| # while the lock guarantees safety, even without it there was never a loss within 50.000 runs (with the default GIL | ||
| # switch interval of 5ms). only when the switch interval was significantly shortened to 0.1 microseconds, losses were | ||
| # observed without locks every now and then. with a no-GIL python, the lock would be essential | ||
|
|
||
|
|
||
| def add_hooks(hooks: list[Hook]) -> None: | ||
| global _hooks | ||
| _hooks = _hooks + hooks | ||
| with _hooks_lock: | ||
| global _hooks | ||
| _hooks = _hooks + hooks | ||
|
|
||
|
|
||
| def clear_hooks() -> None: | ||
| global _hooks | ||
| _hooks = [] | ||
| with _hooks_lock: | ||
| global _hooks | ||
| _hooks = [] | ||
|
|
||
|
|
||
| def get_hooks() -> list[Hook]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -261,5 +261,6 @@ def emit_provider_stale(self, details: ProviderEventDetails) -> None: | |
| self.emit(ProviderEvent.PROVIDER_STALE, details) | ||
|
|
||
| def emit(self, event: ProviderEvent, details: ProviderEventDetails) -> None: | ||
| if hasattr(self, "_on_emit"): | ||
| self._on_emit(self, event, details) | ||
| on_emit = getattr(self, "_on_emit", None) | ||
| if on_emit is not None: | ||
| on_emit(self, event, details) | ||
|
Comment on lines
-264
to
+266
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this change needed? |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import threading | ||
|
|
||
| from openfeature._event_support import clear as clear_event_handlers | ||
| from openfeature._event_support import run_handlers_for_provider | ||
| from openfeature.evaluation_context import EvaluationContext, get_evaluation_context | ||
| from openfeature.event import ( | ||
|
|
@@ -54,9 +55,16 @@ def set_provider( | |
| self._shutdown_if_unused(old_provider) | ||
|
|
||
| def get_provider(self, domain: str | None) -> FeatureProvider: | ||
| if domain is None: | ||
| return self._default_provider | ||
| return self._providers.get(domain, self._default_provider) | ||
| # defensive lock under the GIL as the op is basically atomic | ||
| # but we might want to keep it so a provider that's about | ||
| # to be shut down isn't returned | ||
| # however it contributes to a potential deadlock that is currently | ||
| # still in place (clear_providers: registry's lock -> _event_support's lock; | ||
| # run_handlers_for_provider: _event_support's lock -> registry's lock) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just writing a comment here as a TODO that still needs to be resolved before merging
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think we gain anything by adding a lock here, because the shutdown can also happen directly after you get the provider. |
||
| with self._lock: | ||
| if domain is None: | ||
| return self._default_provider | ||
| return self._providers.get(domain, self._default_provider) | ||
|
|
||
| def set_default_provider( | ||
| self, provider: FeatureProvider, wait_for_init: bool = False | ||
|
|
@@ -83,7 +91,8 @@ def set_default_provider( | |
| self._shutdown_if_unused(old_provider) | ||
|
|
||
| def get_default_provider(self) -> FeatureProvider: | ||
| return self._default_provider | ||
| with self._lock: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, not needed |
||
| return self._default_provider | ||
|
|
||
| def clear_providers(self) -> None: | ||
| self.shutdown() | ||
|
|
@@ -93,11 +102,13 @@ def clear_providers(self) -> None: | |
| self._provider_status = { | ||
| self._default_provider: ProviderStatus.READY, | ||
| } | ||
| clear_event_handlers() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is already handled in the API level |
||
|
|
||
| def shutdown(self) -> None: | ||
| with self._lock: | ||
| providers = {self._default_provider, *self._providers.values()} | ||
|
|
||
| # do we want to move this inside the lock? it allows a narrow double-shutdown window | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was done intentionally to not hold the lock for too long and risk a deadlock |
||
| for provider in providers: | ||
| self._shutdown_provider(provider) | ||
|
|
||
|
|
@@ -214,7 +225,12 @@ def _shutdown_provider( | |
| provider.detach() | ||
|
|
||
| def get_provider_status(self, provider: FeatureProvider) -> ProviderStatus: | ||
| return self._provider_status.get(provider, ProviderStatus.NOT_READY) | ||
| # defensive lock under the GIL as the op is basically atomic | ||
| # but we might want to keep it so a provider that's about | ||
| # to be shut down isn't returned | ||
| # however, removing it would enable moving _run_immediate_handler into the lock i think | ||
| with self._lock: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned before, not needed. |
||
| return self._provider_status.get(provider, ProviderStatus.NOT_READY) | ||
|
|
||
| def dispatch_event( | ||
| self, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| import threading | ||
|
|
||
| from openfeature.evaluation_context import EvaluationContext | ||
| from openfeature.transaction_context.context_var_transaction_context_propagator import ( | ||
| ContextVarsTransactionContextPropagator, | ||
|
|
@@ -21,25 +23,28 @@ | |
| _evaluation_transaction_context_propagator: TransactionContextPropagator = ( | ||
| NoOpTransactionContextPropagator() | ||
| ) | ||
| _propagator_lock = threading.RLock() | ||
|
|
||
|
|
||
| def set_transaction_context_propagator( | ||
| transaction_context_propagator: TransactionContextPropagator, | ||
| ) -> None: | ||
| global _evaluation_transaction_context_propagator | ||
| _evaluation_transaction_context_propagator = transaction_context_propagator | ||
| with _propagator_lock: | ||
| _evaluation_transaction_context_propagator = transaction_context_propagator | ||
|
|
||
|
|
||
| def clear_transaction_context_propagator() -> None: | ||
| set_transaction_context_propagator(NoOpTransactionContextPropagator()) | ||
|
|
||
|
|
||
| def get_transaction_context() -> EvaluationContext: | ||
| return _evaluation_transaction_context_propagator.get_transaction_context() | ||
| with _propagator_lock: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure about this one here, if this is really needed or not |
||
| propagator = _evaluation_transaction_context_propagator | ||
| return propagator.get_transaction_context() | ||
|
|
||
|
|
||
| def set_transaction_context(evaluation_context: EvaluationContext) -> None: | ||
| global _evaluation_transaction_context_propagator | ||
| _evaluation_transaction_context_propagator.set_transaction_context( | ||
| evaluation_context | ||
| ) | ||
| with _propagator_lock: | ||
| propagator = _evaluation_transaction_context_propagator | ||
| propagator.set_transaction_context(evaluation_context) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and this should be added back