diff --git a/mixpanel/flags/local_feature_flags.py b/mixpanel/flags/local_feature_flags.py index ed13ee0..5cc1e73 100644 --- a/mixpanel/flags/local_feature_flags.py +++ b/mixpanel/flags/local_feature_flags.py @@ -511,12 +511,23 @@ def _track_exposure( if latency_in_seconds is not None: properties["Variant fetch latency (ms)"] = latency_in_seconds * 1000 - self._tracker(distinct_id, EXPOSURE_EVENT, properties) + self._dispatch_exposure(distinct_id, properties) else: logger.error( "Cannot track exposure event without a distinct_id in the context" ) + def _dispatch_exposure(self, distinct_id: str, properties: dict[str, Any]) -> None: + if (executor := self._config.exposure_executor) is not None: + try: + executor.submit(self._tracker, distinct_id, EXPOSURE_EVENT, properties) + except RuntimeError: + logger.exception( + "Exposure event dropped — executor refused to accept task" + ) + return + self._tracker(distinct_id, EXPOSURE_EVENT, properties) + async def __aenter__(self): return self diff --git a/mixpanel/flags/remote_feature_flags.py b/mixpanel/flags/remote_feature_flags.py index b26f4af..af0d40a 100644 --- a/mixpanel/flags/remote_feature_flags.py +++ b/mixpanel/flags/remote_feature_flags.py @@ -237,7 +237,7 @@ def get_variant( properties = self._build_tracking_properties( flag_key, selected_variant, start_time, end_time ) - self._tracker(distinct_id, EXPOSURE_EVENT, properties) + self._dispatch_exposure(distinct_id, properties) except Exception: logger.exception("Failed to get remote variant for flag '%s'", flag_key) @@ -267,12 +267,23 @@ def track_exposure_event( """ if distinct_id := context.get("distinct_id"): properties = self._build_tracking_properties(flag_key, variant) - self._tracker(distinct_id, EXPOSURE_EVENT, properties) + self._dispatch_exposure(distinct_id, properties) else: logger.error( "Cannot track exposure event without a distinct_id in the context" ) + def _dispatch_exposure(self, distinct_id: str, properties: dict[str, Any]) -> None: + if (executor := self._config.exposure_executor) is not None: + try: + executor.submit(self._tracker, distinct_id, EXPOSURE_EVENT, properties) + except RuntimeError: + logger.exception( + "Exposure event dropped — executor refused to accept task" + ) + return + self._tracker(distinct_id, EXPOSURE_EVENT, properties) + def _prepare_query_params( self, context: dict[str, Any], flag_key: str | None = None ) -> dict[str, str]: diff --git a/mixpanel/flags/test_local_feature_flags.py b/mixpanel/flags/test_local_feature_flags.py index 592e3ee..2177df1 100644 --- a/mixpanel/flags/test_local_feature_flags.py +++ b/mixpanel/flags/test_local_feature_flags.py @@ -2,6 +2,7 @@ import asyncio import threading +from concurrent.futures import ThreadPoolExecutor from itertools import chain, repeat from typing import Any from unittest.mock import Mock, patch @@ -646,6 +647,83 @@ async def test_get_variant_value_does_not_track_exposure_without_distinct_id(sel ) self._mock_tracker.assert_not_called() + @respx.mock + async def test_default_exposure_runs_inline_on_calling_thread(self): + """Smoke test: exposure_executor defaults to None, tracker runs inline.""" + flag = create_test_flag(rollout_percentage=100.0) + await self.setup_flags([flag]) + + called_on: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + called_on.append(threading.current_thread()) + + self._mock_tracker.side_effect = tracker + self._flags.get_variant_value(TEST_FLAG_KEY, "fallback", USER_CONTEXT) + assert called_on == [threading.current_thread()] + + @respx.mock + async def test_track_exposure_event_routes_through_executor(self): + """Manual API also honors exposure_executor.""" + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + tracker_done = threading.Event() + captured: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured.append(threading.current_thread()) + tracker_done.set() + + config = LocalFlagsConfig(enable_polling=False, exposure_executor=executor) + provider = LocalFeatureFlagsProvider( + "test-token", config, "1.0.0", Mock(side_effect=tracker) + ) + try: + provider.track_exposure_event( + "manual", + SelectedVariant(variant_key="treatment", variant_value="x"), + USER_CONTEXT, + ) + assert tracker_done.wait(timeout=2.0) + assert captured[0] is not threading.current_thread() + assert captured[0].name.startswith("exposure") + finally: + await provider.__aexit__(None, None, None) + finally: + executor.shutdown(wait=True) + + @respx.mock + async def test_exposure_executor_dispatches_tracker_off_calling_thread(self): + flag = create_test_flag(rollout_percentage=100.0) + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + calling_thread = threading.current_thread() + tracker_thread = threading.Event() + captured_thread: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured_thread.append(threading.current_thread()) + tracker_thread.set() + + tracker_mock = Mock(side_effect=tracker) + config = LocalFlagsConfig(enable_polling=False, exposure_executor=executor) + provider = LocalFeatureFlagsProvider( + "test-token", config, "1.0.0", tracker_mock + ) + try: + respx.get("https://api.mixpanel.com/flags/definitions").mock( + return_value=create_flags_response([flag]) + ) + await provider.astart_polling_for_definitions() + provider.get_variant_value(TEST_FLAG_KEY, "fallback", USER_CONTEXT) + assert tracker_thread.wait(timeout=2.0), "tracker never ran" + assert captured_thread[0] is not calling_thread + assert captured_thread[0].name.startswith("exposure") + finally: + await provider.__aexit__(None, None, None) + finally: + executor.shutdown(wait=True) + @respx.mock async def test_get_all_variants_returns_all_variants_when_user_in_rollout(self): flag1 = create_test_flag(flag_key="flag1", rollout_percentage=100.0) diff --git a/mixpanel/flags/test_remote_feature_flags.py b/mixpanel/flags/test_remote_feature_flags.py index 5b5dd71..164ba5a 100644 --- a/mixpanel/flags/test_remote_feature_flags.py +++ b/mixpanel/flags/test_remote_feature_flags.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor from unittest.mock import Mock import httpx @@ -281,6 +283,96 @@ def test_get_variant_value_does_not_track_exposure_event_if_fallback(self): ) self.mock_tracker.assert_not_called() + def test_default_exposure_runs_inline_on_calling_thread(self): + """Smoke test: exposure_executor defaults to None, tracker runs inline.""" + called_on: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + called_on.append(threading.current_thread()) + + provider = RemoteFeatureFlagsProvider( + "test-token", RemoteFlagsConfig(), "1.0.0", tracker + ) + try: + provider.track_exposure_event( + "manual", + SelectedVariant(variant_key="treatment", variant_value="x"), + {"distinct_id": "user123"}, + ) + assert called_on == [threading.current_thread()] + finally: + provider.__exit__(None, None, None) + + def test_track_exposure_event_routes_through_executor(self): + """Manual API also honors exposure_executor.""" + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + tracker_done = threading.Event() + captured: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured.append(threading.current_thread()) + tracker_done.set() + + provider = RemoteFeatureFlagsProvider( + "test-token", + RemoteFlagsConfig(exposure_executor=executor), + "1.0.0", + Mock(side_effect=tracker), + ) + try: + provider.track_exposure_event( + "manual", + SelectedVariant(variant_key="treatment", variant_value="x"), + {"distinct_id": "user123"}, + ) + assert tracker_done.wait(timeout=2.0) + assert captured[0] is not threading.current_thread() + assert captured[0].name.startswith("exposure") + finally: + provider.__exit__(None, None, None) + finally: + executor.shutdown(wait=True) + + @respx.mock + def test_exposure_executor_dispatches_tracker_off_calling_thread(self): + respx.get(ENDPOINT).mock( + return_value=create_success_response( + { + "test_flag": SelectedVariant( + variant_key="treatment", variant_value="treatment" + ) + } + ) + ) + + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + calling_thread = threading.current_thread() + tracker_thread = threading.Event() + captured_thread: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured_thread.append(threading.current_thread()) + tracker_thread.set() + + tracker_mock = Mock(side_effect=tracker) + config = RemoteFlagsConfig(exposure_executor=executor) + provider = RemoteFeatureFlagsProvider( + "test-token", config, "1.0.0", tracker_mock + ) + try: + provider.get_variant_value( + "test_flag", "control", {"distinct_id": "user123"} + ) + assert tracker_thread.wait(timeout=2.0), "tracker never ran" + assert captured_thread[0] is not calling_thread + assert captured_thread[0].name.startswith("exposure") + finally: + provider.__exit__(None, None, None) + finally: + executor.shutdown(wait=True) + @respx.mock def test_is_enabled_returns_true_for_true_variant_value(self): respx.get(ENDPOINT).mock( diff --git a/mixpanel/flags/types.py b/mixpanel/flags/types.py index 8325439..09031b3 100644 --- a/mixpanel/flags/types.py +++ b/mixpanel/flags/types.py @@ -1,3 +1,4 @@ +from concurrent.futures import Executor from typing import Any, Optional from pydantic import BaseModel, ConfigDict @@ -10,6 +11,10 @@ class FlagsConfig(BaseModel): api_host: str = "api.mixpanel.com" request_timeout_in_seconds: int = 10 + # Optional executor used to dispatch exposure-event HTTP sends so flag + # evaluation does not block on the network round trip. None (default) + # preserves the existing inline behavior. + exposure_executor: Optional[Executor] = None class LocalFlagsConfig(FlagsConfig): diff --git a/openfeature-provider/README.md b/openfeature-provider/README.md index e472f90..e301df0 100644 --- a/openfeature-provider/README.md +++ b/openfeature-provider/README.md @@ -201,6 +201,32 @@ When your application is shutting down, call `shutdown()` to clean up resources: provider.shutdown() ``` +### Async Exposure Tracking + +By default, every flag evaluation tracks an exposure event inline — the `/track` HTTP round trip happens on the calling thread before `get_*_value()` returns. For latency-sensitive code paths, pass a `concurrent.futures.Executor` so exposure tracking runs off-thread: + +```python +from concurrent.futures import ThreadPoolExecutor +from mixpanel.flags.types import LocalFlagsConfig + +exposure_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="mixpanel-exposure") +provider = MixpanelProvider.from_local_config( + "YOUR_PROJECT_TOKEN", + LocalFlagsConfig(exposure_executor=exposure_executor), +) + +# Flag evaluation now returns as soon as the local logic finishes; +# the exposure POST runs on the executor. +client = api.get_client() +client.get_boolean_value("my-feature", False) + +# On shutdown, drain the executor so in-flight exposures get sent. +provider.shutdown() +exposure_executor.shutdown(wait=True) +``` + +The same option lives on `RemoteFlagsConfig`. Defaults to `None` (inline behavior); existing setups are unaffected. The async (`a*`) methods already dispatch exposure tracking via `asyncio.create_task` and ignore `exposure_executor`. + ## Context Mapping ### All Properties Passed Directly