From 4406e0677c5dee304ac3c40c9d181ed61f0e3887 Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Tue, 30 Jun 2026 13:07:20 -0400 Subject: [PATCH 1/3] fix(analytics): allow capability to offload reportExposure to async thread Exposure tracking on the sync path called the user's tracker (and therefore an HTTP POST) inline, blocking every getVariant / getVariantValue / isEnabled call by the full /track round trip. The async paths already use asyncio.create_task; only the sync path was paying the cost. Add an optional `exposure_executor: concurrent.futures.Executor` field to FlagsConfig. When set, the sync providers dispatch the tracker call via executor.submit so flag evaluation returns as soon as the local logic finishes. None (the default) preserves the existing inline behavior. Mirrors mixpanel-java#85. Co-Authored-By: Claude Opus 4.7 --- mixpanel/flags/local_feature_flags.py | 13 ++++++- mixpanel/flags/remote_feature_flags.py | 15 +++++++- mixpanel/flags/test_local_feature_flags.py | 35 ++++++++++++++++++ mixpanel/flags/test_remote_feature_flags.py | 41 +++++++++++++++++++++ mixpanel/flags/types.py | 5 +++ 5 files changed, 106 insertions(+), 3 deletions(-) diff --git a/mixpanel/flags/local_feature_flags.py b/mixpanel/flags/local_feature_flags.py index ed13ee0..5cc1e73 100644 --- a/mixpanel/flags/local_feature_flags.py +++ b/mixpanel/flags/local_feature_flags.py @@ -511,12 +511,23 @@ def _track_exposure( if latency_in_seconds is not None: properties["Variant fetch latency (ms)"] = latency_in_seconds * 1000 - self._tracker(distinct_id, EXPOSURE_EVENT, properties) + self._dispatch_exposure(distinct_id, properties) else: logger.error( "Cannot track exposure event without a distinct_id in the context" ) + def _dispatch_exposure(self, distinct_id: str, properties: dict[str, Any]) -> None: + if (executor := self._config.exposure_executor) is not None: + try: + executor.submit(self._tracker, distinct_id, EXPOSURE_EVENT, properties) + except RuntimeError: + logger.exception( + "Exposure event dropped — executor refused to accept task" + ) + return + self._tracker(distinct_id, EXPOSURE_EVENT, properties) + async def __aenter__(self): return self diff --git a/mixpanel/flags/remote_feature_flags.py b/mixpanel/flags/remote_feature_flags.py index b26f4af..af0d40a 100644 --- a/mixpanel/flags/remote_feature_flags.py +++ b/mixpanel/flags/remote_feature_flags.py @@ -237,7 +237,7 @@ def get_variant( properties = self._build_tracking_properties( flag_key, selected_variant, start_time, end_time ) - self._tracker(distinct_id, EXPOSURE_EVENT, properties) + self._dispatch_exposure(distinct_id, properties) except Exception: logger.exception("Failed to get remote variant for flag '%s'", flag_key) @@ -267,12 +267,23 @@ def track_exposure_event( """ if distinct_id := context.get("distinct_id"): properties = self._build_tracking_properties(flag_key, variant) - self._tracker(distinct_id, EXPOSURE_EVENT, properties) + self._dispatch_exposure(distinct_id, properties) else: logger.error( "Cannot track exposure event without a distinct_id in the context" ) + def _dispatch_exposure(self, distinct_id: str, properties: dict[str, Any]) -> None: + if (executor := self._config.exposure_executor) is not None: + try: + executor.submit(self._tracker, distinct_id, EXPOSURE_EVENT, properties) + except RuntimeError: + logger.exception( + "Exposure event dropped — executor refused to accept task" + ) + return + self._tracker(distinct_id, EXPOSURE_EVENT, properties) + def _prepare_query_params( self, context: dict[str, Any], flag_key: str | None = None ) -> dict[str, str]: diff --git a/mixpanel/flags/test_local_feature_flags.py b/mixpanel/flags/test_local_feature_flags.py index 592e3ee..3c49a04 100644 --- a/mixpanel/flags/test_local_feature_flags.py +++ b/mixpanel/flags/test_local_feature_flags.py @@ -2,6 +2,7 @@ import asyncio import threading +from concurrent.futures import ThreadPoolExecutor from itertools import chain, repeat from typing import Any from unittest.mock import Mock, patch @@ -646,6 +647,40 @@ async def test_get_variant_value_does_not_track_exposure_without_distinct_id(sel ) self._mock_tracker.assert_not_called() + @respx.mock + async def test_exposure_executor_dispatches_tracker_off_calling_thread(self): + flag = create_test_flag(rollout_percentage=100.0) + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + calling_thread = threading.current_thread() + tracker_thread = threading.Event() + captured_thread: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured_thread.append(threading.current_thread()) + tracker_thread.set() + + tracker_mock = Mock(side_effect=tracker) + config = LocalFlagsConfig( + enable_polling=False, exposure_executor=executor + ) + provider = LocalFeatureFlagsProvider( + "test-token", config, "1.0.0", tracker_mock + ) + try: + respx.get("https://api.mixpanel.com/flags/definitions").mock( + return_value=create_flags_response([flag]) + ) + await provider.astart_polling_for_definitions() + provider.get_variant_value(TEST_FLAG_KEY, "fallback", USER_CONTEXT) + assert tracker_thread.wait(timeout=2.0), "tracker never ran" + assert captured_thread[0] is not calling_thread + assert captured_thread[0].name.startswith("exposure") + finally: + await provider.__aexit__(None, None, None) + finally: + executor.shutdown(wait=True) + @respx.mock async def test_get_all_variants_returns_all_variants_when_user_in_rollout(self): flag1 = create_test_flag(flag_key="flag1", rollout_percentage=100.0) diff --git a/mixpanel/flags/test_remote_feature_flags.py b/mixpanel/flags/test_remote_feature_flags.py index 5b5dd71..71dce30 100644 --- a/mixpanel/flags/test_remote_feature_flags.py +++ b/mixpanel/flags/test_remote_feature_flags.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor from unittest.mock import Mock import httpx @@ -281,6 +283,45 @@ def test_get_variant_value_does_not_track_exposure_event_if_fallback(self): ) self.mock_tracker.assert_not_called() + @respx.mock + def test_exposure_executor_dispatches_tracker_off_calling_thread(self): + respx.get(ENDPOINT).mock( + return_value=create_success_response( + { + "test_flag": SelectedVariant( + variant_key="treatment", variant_value="treatment" + ) + } + ) + ) + + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + calling_thread = threading.current_thread() + tracker_thread = threading.Event() + captured_thread: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured_thread.append(threading.current_thread()) + tracker_thread.set() + + tracker_mock = Mock(side_effect=tracker) + config = RemoteFlagsConfig(exposure_executor=executor) + provider = RemoteFeatureFlagsProvider( + "test-token", config, "1.0.0", tracker_mock + ) + try: + provider.get_variant_value( + "test_flag", "control", {"distinct_id": "user123"} + ) + assert tracker_thread.wait(timeout=2.0), "tracker never ran" + assert captured_thread[0] is not calling_thread + assert captured_thread[0].name.startswith("exposure") + finally: + provider.__exit__(None, None, None) + finally: + executor.shutdown(wait=True) + @respx.mock def test_is_enabled_returns_true_for_true_variant_value(self): respx.get(ENDPOINT).mock( diff --git a/mixpanel/flags/types.py b/mixpanel/flags/types.py index 8325439..09031b3 100644 --- a/mixpanel/flags/types.py +++ b/mixpanel/flags/types.py @@ -1,3 +1,4 @@ +from concurrent.futures import Executor from typing import Any, Optional from pydantic import BaseModel, ConfigDict @@ -10,6 +11,10 @@ class FlagsConfig(BaseModel): api_host: str = "api.mixpanel.com" request_timeout_in_seconds: int = 10 + # Optional executor used to dispatch exposure-event HTTP sends so flag + # evaluation does not block on the network round trip. None (default) + # preserves the existing inline behavior. + exposure_executor: Optional[Executor] = None class LocalFlagsConfig(FlagsConfig): From 29ac119dddba52a58ad84a4a6c936e6f4d515a39 Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Tue, 30 Jun 2026 14:11:37 -0400 Subject: [PATCH 2/3] docs+test(analytics): exposure_executor README example + broader coverage - Add an "Async Exposure Tracking" section to the openfeature-provider README showing how to configure a ThreadPoolExecutor. - Add tests covering the manual track_exposure_event API path (previously only the implicit-via-get_variant path was covered). - Add explicit tests asserting the default (None) still runs inline on the calling thread. Co-Authored-By: Claude Opus 4.7 --- mixpanel/flags/test_local_feature_flags.py | 47 +++++++++++++++++++ mixpanel/flags/test_remote_feature_flags.py | 51 +++++++++++++++++++++ openfeature-provider/README.md | 26 +++++++++++ 3 files changed, 124 insertions(+) diff --git a/mixpanel/flags/test_local_feature_flags.py b/mixpanel/flags/test_local_feature_flags.py index 3c49a04..f1f58e9 100644 --- a/mixpanel/flags/test_local_feature_flags.py +++ b/mixpanel/flags/test_local_feature_flags.py @@ -647,6 +647,53 @@ async def test_get_variant_value_does_not_track_exposure_without_distinct_id(sel ) self._mock_tracker.assert_not_called() + @respx.mock + async def test_default_exposure_runs_inline_on_calling_thread(self): + """Smoke test: exposure_executor defaults to None, tracker runs inline.""" + flag = create_test_flag(rollout_percentage=100.0) + await self.setup_flags([flag]) + + called_on: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + called_on.append(threading.current_thread()) + + self._mock_tracker.side_effect = tracker + self._flags.get_variant_value(TEST_FLAG_KEY, "fallback", USER_CONTEXT) + assert called_on == [threading.current_thread()] + + @respx.mock + async def test_track_exposure_event_routes_through_executor(self): + """Manual API also honors exposure_executor.""" + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + tracker_done = threading.Event() + captured: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured.append(threading.current_thread()) + tracker_done.set() + + config = LocalFlagsConfig( + enable_polling=False, exposure_executor=executor + ) + provider = LocalFeatureFlagsProvider( + "test-token", config, "1.0.0", Mock(side_effect=tracker) + ) + try: + provider.track_exposure_event( + "manual", + SelectedVariant(variant_key="treatment", variant_value="x"), + USER_CONTEXT, + ) + assert tracker_done.wait(timeout=2.0) + assert captured[0] is not threading.current_thread() + assert captured[0].name.startswith("exposure") + finally: + await provider.__aexit__(None, None, None) + finally: + executor.shutdown(wait=True) + @respx.mock async def test_exposure_executor_dispatches_tracker_off_calling_thread(self): flag = create_test_flag(rollout_percentage=100.0) diff --git a/mixpanel/flags/test_remote_feature_flags.py b/mixpanel/flags/test_remote_feature_flags.py index 71dce30..164ba5a 100644 --- a/mixpanel/flags/test_remote_feature_flags.py +++ b/mixpanel/flags/test_remote_feature_flags.py @@ -283,6 +283,57 @@ def test_get_variant_value_does_not_track_exposure_event_if_fallback(self): ) self.mock_tracker.assert_not_called() + def test_default_exposure_runs_inline_on_calling_thread(self): + """Smoke test: exposure_executor defaults to None, tracker runs inline.""" + called_on: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + called_on.append(threading.current_thread()) + + provider = RemoteFeatureFlagsProvider( + "test-token", RemoteFlagsConfig(), "1.0.0", tracker + ) + try: + provider.track_exposure_event( + "manual", + SelectedVariant(variant_key="treatment", variant_value="x"), + {"distinct_id": "user123"}, + ) + assert called_on == [threading.current_thread()] + finally: + provider.__exit__(None, None, None) + + def test_track_exposure_event_routes_through_executor(self): + """Manual API also honors exposure_executor.""" + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure") + try: + tracker_done = threading.Event() + captured: list[threading.Thread] = [] + + def tracker(_distinct_id, _event, _properties): + captured.append(threading.current_thread()) + tracker_done.set() + + provider = RemoteFeatureFlagsProvider( + "test-token", + RemoteFlagsConfig(exposure_executor=executor), + "1.0.0", + Mock(side_effect=tracker), + ) + try: + provider.track_exposure_event( + "manual", + SelectedVariant(variant_key="treatment", variant_value="x"), + {"distinct_id": "user123"}, + ) + assert tracker_done.wait(timeout=2.0) + assert captured[0] is not threading.current_thread() + assert captured[0].name.startswith("exposure") + finally: + provider.__exit__(None, None, None) + finally: + executor.shutdown(wait=True) + @respx.mock def test_exposure_executor_dispatches_tracker_off_calling_thread(self): respx.get(ENDPOINT).mock( diff --git a/openfeature-provider/README.md b/openfeature-provider/README.md index e472f90..e301df0 100644 --- a/openfeature-provider/README.md +++ b/openfeature-provider/README.md @@ -201,6 +201,32 @@ When your application is shutting down, call `shutdown()` to clean up resources: provider.shutdown() ``` +### Async Exposure Tracking + +By default, every flag evaluation tracks an exposure event inline — the `/track` HTTP round trip happens on the calling thread before `get_*_value()` returns. For latency-sensitive code paths, pass a `concurrent.futures.Executor` so exposure tracking runs off-thread: + +```python +from concurrent.futures import ThreadPoolExecutor +from mixpanel.flags.types import LocalFlagsConfig + +exposure_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="mixpanel-exposure") +provider = MixpanelProvider.from_local_config( + "YOUR_PROJECT_TOKEN", + LocalFlagsConfig(exposure_executor=exposure_executor), +) + +# Flag evaluation now returns as soon as the local logic finishes; +# the exposure POST runs on the executor. +client = api.get_client() +client.get_boolean_value("my-feature", False) + +# On shutdown, drain the executor so in-flight exposures get sent. +provider.shutdown() +exposure_executor.shutdown(wait=True) +``` + +The same option lives on `RemoteFlagsConfig`. Defaults to `None` (inline behavior); existing setups are unaffected. The async (`a*`) methods already dispatch exposure tracking via `asyncio.create_task` and ignore `exposure_executor`. + ## Context Mapping ### All Properties Passed Directly From f810e2f316800805c52cb4a89e0104a75b15149d Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Tue, 30 Jun 2026 14:20:56 -0400 Subject: [PATCH 3/3] chore: ruff format --- mixpanel/flags/test_local_feature_flags.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/mixpanel/flags/test_local_feature_flags.py b/mixpanel/flags/test_local_feature_flags.py index f1f58e9..2177df1 100644 --- a/mixpanel/flags/test_local_feature_flags.py +++ b/mixpanel/flags/test_local_feature_flags.py @@ -674,9 +674,7 @@ def tracker(_distinct_id, _event, _properties): captured.append(threading.current_thread()) tracker_done.set() - config = LocalFlagsConfig( - enable_polling=False, exposure_executor=executor - ) + config = LocalFlagsConfig(enable_polling=False, exposure_executor=executor) provider = LocalFeatureFlagsProvider( "test-token", config, "1.0.0", Mock(side_effect=tracker) ) @@ -708,9 +706,7 @@ def tracker(_distinct_id, _event, _properties): tracker_thread.set() tracker_mock = Mock(side_effect=tracker) - config = LocalFlagsConfig( - enable_polling=False, exposure_executor=executor - ) + config = LocalFlagsConfig(enable_polling=False, exposure_executor=executor) provider = LocalFeatureFlagsProvider( "test-token", config, "1.0.0", tracker_mock )