Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion mixpanel/flags/local_feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,23 @@ def _track_exposure(
if latency_in_seconds is not None:
properties["Variant fetch latency (ms)"] = latency_in_seconds * 1000

self._tracker(distinct_id, EXPOSURE_EVENT, properties)
self._dispatch_exposure(distinct_id, properties)
else:
logger.error(
"Cannot track exposure event without a distinct_id in the context"
)

def _dispatch_exposure(self, distinct_id: str, properties: dict[str, Any]) -> None:
if (executor := self._config.exposure_executor) is not None:
try:
executor.submit(self._tracker, distinct_id, EXPOSURE_EVENT, properties)
except RuntimeError:
logger.exception(
"Exposure event dropped — executor refused to accept task"
)
return
self._tracker(distinct_id, EXPOSURE_EVENT, properties)

async def __aenter__(self):
return self

Expand Down
15 changes: 13 additions & 2 deletions mixpanel/flags/remote_feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def get_variant(
properties = self._build_tracking_properties(
flag_key, selected_variant, start_time, end_time
)
self._tracker(distinct_id, EXPOSURE_EVENT, properties)
self._dispatch_exposure(distinct_id, properties)

except Exception:
logger.exception("Failed to get remote variant for flag '%s'", flag_key)
Expand Down Expand Up @@ -267,12 +267,23 @@ def track_exposure_event(
"""
if distinct_id := context.get("distinct_id"):
properties = self._build_tracking_properties(flag_key, variant)
self._tracker(distinct_id, EXPOSURE_EVENT, properties)
self._dispatch_exposure(distinct_id, properties)
else:
logger.error(
"Cannot track exposure event without a distinct_id in the context"
)

def _dispatch_exposure(self, distinct_id: str, properties: dict[str, Any]) -> None:
if (executor := self._config.exposure_executor) is not None:
try:
executor.submit(self._tracker, distinct_id, EXPOSURE_EVENT, properties)
except RuntimeError:
logger.exception(
"Exposure event dropped — executor refused to accept task"
)
return
self._tracker(distinct_id, EXPOSURE_EVENT, properties)

def _prepare_query_params(
self, context: dict[str, Any], flag_key: str | None = None
) -> dict[str, str]:
Expand Down
78 changes: 78 additions & 0 deletions mixpanel/flags/test_local_feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from itertools import chain, repeat
from typing import Any
from unittest.mock import Mock, patch
Expand Down Expand Up @@ -646,6 +647,83 @@ async def test_get_variant_value_does_not_track_exposure_without_distinct_id(sel
)
self._mock_tracker.assert_not_called()

@respx.mock
async def test_default_exposure_runs_inline_on_calling_thread(self):
"""Smoke test: exposure_executor defaults to None, tracker runs inline."""
flag = create_test_flag(rollout_percentage=100.0)
await self.setup_flags([flag])

called_on: list[threading.Thread] = []

def tracker(_distinct_id, _event, _properties):
called_on.append(threading.current_thread())

self._mock_tracker.side_effect = tracker
self._flags.get_variant_value(TEST_FLAG_KEY, "fallback", USER_CONTEXT)
assert called_on == [threading.current_thread()]

@respx.mock
async def test_track_exposure_event_routes_through_executor(self):
"""Manual API also honors exposure_executor."""
executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure")
try:
tracker_done = threading.Event()
captured: list[threading.Thread] = []

def tracker(_distinct_id, _event, _properties):
captured.append(threading.current_thread())
tracker_done.set()

config = LocalFlagsConfig(enable_polling=False, exposure_executor=executor)
provider = LocalFeatureFlagsProvider(
"test-token", config, "1.0.0", Mock(side_effect=tracker)
)
try:
provider.track_exposure_event(
"manual",
SelectedVariant(variant_key="treatment", variant_value="x"),
USER_CONTEXT,
)
assert tracker_done.wait(timeout=2.0)
assert captured[0] is not threading.current_thread()
assert captured[0].name.startswith("exposure")
finally:
await provider.__aexit__(None, None, None)
finally:
executor.shutdown(wait=True)

@respx.mock
async def test_exposure_executor_dispatches_tracker_off_calling_thread(self):
flag = create_test_flag(rollout_percentage=100.0)
executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure")
try:
calling_thread = threading.current_thread()
tracker_thread = threading.Event()
captured_thread: list[threading.Thread] = []

def tracker(_distinct_id, _event, _properties):
captured_thread.append(threading.current_thread())
tracker_thread.set()

tracker_mock = Mock(side_effect=tracker)
config = LocalFlagsConfig(enable_polling=False, exposure_executor=executor)
provider = LocalFeatureFlagsProvider(
"test-token", config, "1.0.0", tracker_mock
)
try:
respx.get("https://api.mixpanel.com/flags/definitions").mock(
return_value=create_flags_response([flag])
)
await provider.astart_polling_for_definitions()
provider.get_variant_value(TEST_FLAG_KEY, "fallback", USER_CONTEXT)
assert tracker_thread.wait(timeout=2.0), "tracker never ran"
assert captured_thread[0] is not calling_thread
assert captured_thread[0].name.startswith("exposure")
finally:
await provider.__aexit__(None, None, None)
finally:
executor.shutdown(wait=True)

@respx.mock
async def test_get_all_variants_returns_all_variants_when_user_in_rollout(self):
flag1 = create_test_flag(flag_key="flag1", rollout_percentage=100.0)
Expand Down
92 changes: 92 additions & 0 deletions mixpanel/flags/test_remote_feature_flags.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock

import httpx
Expand Down Expand Up @@ -281,6 +283,96 @@ def test_get_variant_value_does_not_track_exposure_event_if_fallback(self):
)
self.mock_tracker.assert_not_called()

def test_default_exposure_runs_inline_on_calling_thread(self):
"""Smoke test: exposure_executor defaults to None, tracker runs inline."""
called_on: list[threading.Thread] = []

def tracker(_distinct_id, _event, _properties):
called_on.append(threading.current_thread())

provider = RemoteFeatureFlagsProvider(
"test-token", RemoteFlagsConfig(), "1.0.0", tracker
)
try:
provider.track_exposure_event(
"manual",
SelectedVariant(variant_key="treatment", variant_value="x"),
{"distinct_id": "user123"},
)
assert called_on == [threading.current_thread()]
finally:
provider.__exit__(None, None, None)

def test_track_exposure_event_routes_through_executor(self):
"""Manual API also honors exposure_executor."""
executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure")
try:
tracker_done = threading.Event()
captured: list[threading.Thread] = []

def tracker(_distinct_id, _event, _properties):
captured.append(threading.current_thread())
tracker_done.set()

provider = RemoteFeatureFlagsProvider(
"test-token",
RemoteFlagsConfig(exposure_executor=executor),
"1.0.0",
Mock(side_effect=tracker),
)
try:
provider.track_exposure_event(
"manual",
SelectedVariant(variant_key="treatment", variant_value="x"),
{"distinct_id": "user123"},
)
assert tracker_done.wait(timeout=2.0)
assert captured[0] is not threading.current_thread()
assert captured[0].name.startswith("exposure")
finally:
provider.__exit__(None, None, None)
finally:
executor.shutdown(wait=True)

@respx.mock
def test_exposure_executor_dispatches_tracker_off_calling_thread(self):
respx.get(ENDPOINT).mock(
return_value=create_success_response(
{
"test_flag": SelectedVariant(
variant_key="treatment", variant_value="treatment"
)
}
)
)

executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="exposure")
try:
calling_thread = threading.current_thread()
tracker_thread = threading.Event()
captured_thread: list[threading.Thread] = []

def tracker(_distinct_id, _event, _properties):
captured_thread.append(threading.current_thread())
tracker_thread.set()

tracker_mock = Mock(side_effect=tracker)
config = RemoteFlagsConfig(exposure_executor=executor)
provider = RemoteFeatureFlagsProvider(
"test-token", config, "1.0.0", tracker_mock
)
try:
provider.get_variant_value(
"test_flag", "control", {"distinct_id": "user123"}
)
assert tracker_thread.wait(timeout=2.0), "tracker never ran"
assert captured_thread[0] is not calling_thread
assert captured_thread[0].name.startswith("exposure")
finally:
provider.__exit__(None, None, None)
finally:
executor.shutdown(wait=True)

@respx.mock
def test_is_enabled_returns_true_for_true_variant_value(self):
respx.get(ENDPOINT).mock(
Expand Down
5 changes: 5 additions & 0 deletions mixpanel/flags/types.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from concurrent.futures import Executor
from typing import Any, Optional

from pydantic import BaseModel, ConfigDict
Expand All @@ -10,6 +11,10 @@ class FlagsConfig(BaseModel):

api_host: str = "api.mixpanel.com"
request_timeout_in_seconds: int = 10
# Optional executor used to dispatch exposure-event HTTP sends so flag
# evaluation does not block on the network round trip. None (default)
# preserves the existing inline behavior.
exposure_executor: Optional[Executor] = None


class LocalFlagsConfig(FlagsConfig):
Expand Down
26 changes: 26 additions & 0 deletions openfeature-provider/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ When your application is shutting down, call `shutdown()` to clean up resources:
provider.shutdown()
```

### Async Exposure Tracking

By default, every flag evaluation tracks an exposure event inline — the `/track` HTTP round trip happens on the calling thread before `get_*_value()` returns. For latency-sensitive code paths, pass a `concurrent.futures.Executor` so exposure tracking runs off-thread:

```python
from concurrent.futures import ThreadPoolExecutor
from mixpanel.flags.types import LocalFlagsConfig

exposure_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="mixpanel-exposure")
provider = MixpanelProvider.from_local_config(
"YOUR_PROJECT_TOKEN",
LocalFlagsConfig(exposure_executor=exposure_executor),
)

# Flag evaluation now returns as soon as the local logic finishes;
# the exposure POST runs on the executor.
client = api.get_client()
client.get_boolean_value("my-feature", False)

# On shutdown, drain the executor so in-flight exposures get sent.
provider.shutdown()
exposure_executor.shutdown(wait=True)
```

The same option lives on `RemoteFlagsConfig`. Defaults to `None` (inline behavior); existing setups are unaffected. The async (`a*`) methods already dispatch exposure tracking via `asyncio.create_task` and ignore `exposure_executor`.

## Context Mapping

### All Properties Passed Directly
Expand Down
Loading