Skip to content

New OTel Integration #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ sphinx
sphinx_rtd_theme
sphinx-toolbox
myst_parser
opentelemetry-api
opentelemetry-sdk
3 changes: 2 additions & 1 deletion featuremanagement/azuremonitor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from ._send_telemetry import publish_telemetry, track_event
from ._send_telemetry import publish_telemetry, track_event, TargetingSpanProcessor


__all__ = [
"publish_telemetry",
"track_event",
"TargetingSpanProcessor",
]
69 changes: 61 additions & 8 deletions featuremanagement/azuremonitor/_send_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,39 @@
# license information.
# --------------------------------------------------------------------------
import logging
from typing import Dict, Optional
from .._models import VariantAssignmentReason, EvaluationEvent
import inspect
from typing import Any, Callable, Dict, Optional
from .._models import VariantAssignmentReason, EvaluationEvent, TargetingContext

logger = logging.getLogger(__name__)

try:
from azure.monitor.events.extension import track_event as azure_monitor_track_event # type: ignore
from opentelemetry.context.context import Context
from opentelemetry.sdk.trace import Span, SpanProcessor

HAS_AZURE_MONITOR_EVENTS_EXTENSION = True
except ImportError:
HAS_AZURE_MONITOR_EVENTS_EXTENSION = False
logging.warning(
logger.warning(
"azure-monitor-events-extension is not installed. Telemetry will not be sent to Application Insights."
)
SpanProcessor = object # type: ignore
Span = object # type: ignore
Context = object # type: ignore

FEATURE_NAME = "FeatureName"
ENABLED = "Enabled"
TARGETING_ID = "TargetingId"
VARIANT = "Variant"
REASON = "VariantAssignmentReason"

DEFAULT_WHEN_ENABLED = "DefaultWhenEnabled"
VERSION = "Version"
VARIANT_ASSIGNMENT_PERCENTAGE = "VariantAssignmentPercentage"
MICROSOFT_TARGETING_ID = "Microsoft.TargetingId"
SPAN = "Span"

EVENT_NAME = "FeatureEvaluation"

EVALUATION_EVENT_VERSION = "1.1.0"
Expand Down Expand Up @@ -64,7 +78,7 @@ def publish_telemetry(evaluation_event: EvaluationEvent) -> None:
event: Dict[str, Optional[str]] = {
FEATURE_NAME: feature.name,
ENABLED: str(evaluation_event.enabled),
"Version": EVALUATION_EVENT_VERSION,
VERSION: EVALUATION_EVENT_VERSION,
}

reason = evaluation_event.reason
Expand All @@ -78,25 +92,64 @@ def publish_telemetry(evaluation_event: EvaluationEvent) -> None:
# VariantAllocationPercentage
allocation_percentage = 0
if reason == VariantAssignmentReason.DEFAULT_WHEN_ENABLED:
event["VariantAssignmentPercentage"] = str(100)
event[VARIANT_ASSIGNMENT_PERCENTAGE] = str(100)
if feature.allocation:
for allocation in feature.allocation.percentile:
allocation_percentage += allocation.percentile_to - allocation.percentile_from
event["VariantAssignmentPercentage"] = str(100 - allocation_percentage)
event[VARIANT_ASSIGNMENT_PERCENTAGE] = str(100 - allocation_percentage)
elif reason == VariantAssignmentReason.PERCENTILE:
if feature.allocation and feature.allocation.percentile:
for allocation in feature.allocation.percentile:
if variant and allocation.variant == variant.name:
allocation_percentage += allocation.percentile_to - allocation.percentile_from
event["VariantAssignmentPercentage"] = str(allocation_percentage)
event[VARIANT_ASSIGNMENT_PERCENTAGE] = str(allocation_percentage)

# DefaultWhenEnabled
if feature.allocation and feature.allocation.default_when_enabled:
event["DefaultWhenEnabled"] = feature.allocation.default_when_enabled
event[DEFAULT_WHEN_ENABLED] = feature.allocation.default_when_enabled

if feature.telemetry:
for metadata_key, metadata_value in feature.telemetry.metadata.items():
if metadata_key not in event:
event[metadata_key] = metadata_value

track_event(EVENT_NAME, evaluation_event.user, event_properties=event)


class TargetingSpanProcessor(SpanProcessor):
"""
A custom SpanProcessor that attaches the targeting ID to the span and baggage when a new span is started.
:keyword Callable[[], TargetingContext] targeting_context_accessor: Callback function to get the current targeting
context if one isn't provided.
"""

def __init__(self, **kwargs: Any) -> None:
self._targeting_context_accessor: Optional[Callable[[], TargetingContext]] = kwargs.pop(
"targeting_context_accessor", None
)

def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
"""
Attaches the targeting ID to the span and baggage when a new span is started.

:param Span span: The span that was started.
:param parent_context: The parent context of the span.
"""
if not HAS_AZURE_MONITOR_EVENTS_EXTENSION:
logger.warning("Azure Monitor Events Extension is not installed.")
return
if self._targeting_context_accessor and callable(self._targeting_context_accessor):
if inspect.iscoroutinefunction(self._targeting_context_accessor):
logger.warning("Async targeting_context_accessor is not supported.")
return
targeting_context = self._targeting_context_accessor()
if not targeting_context or not isinstance(targeting_context, TargetingContext):
logger.warning(
"targeting_context_accessor did not return a TargetingContext. Received type %s.",
type(targeting_context),
)
return
if not targeting_context.user_id:
logger.debug("TargetingContext does not have a user ID.")
return
span.set_attribute(TARGETING_ID, targeting_context.user_id)
1 change: 1 addition & 0 deletions project-words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ featuremanagerbase
quickstart
rtype
usefixtures
urandom
60 changes: 60 additions & 0 deletions samples/quarty_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import uuid
import os
from quart import Quart, request, session
from quart.sessions import SecureCookieSessionInterface
from azure.appconfiguration.provider import load
from azure.identity import DefaultAzureCredential
from azure.monitor.opentelemetry import configure_azure_monitor
from featuremanagement.aio import FeatureManager
from featuremanagement import TargetingContext
from featuremanagement.azuremonitor import TargetingSpanProcessor


# A callback for assigning a TargetingContext for both Telemetry logs and Feature Flag evaluation
async def my_targeting_accessor() -> TargetingContext:
session_id = ""
if "Session-ID" in request.headers:
session_id = request.headers["Session-ID"]
return TargetingContext(user_id=session_id)


# Configure Azure Monitor
configure_azure_monitor(
connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"),
span_processors=[TargetingSpanProcessor(targeting_context_accessor=my_targeting_accessor)],
)

app = Quart(__name__)
app.session_interface = SecureCookieSessionInterface()
app.secret_key = os.urandom(24)

endpoint = os.environ.get("APPCONFIGURATION_ENDPOINT_STRING")
credential = DefaultAzureCredential()

# Connecting to Azure App Configuration using AAD
config = load(endpoint=endpoint, credential=credential, feature_flag_enabled=True, feature_flag_refresh_enabled=True)

# Load feature flags and set up targeting context accessor
feature_manager = FeatureManager(config, targeting_context_accessor=my_targeting_accessor)


@app.before_request
async def before_request():
if "session_id" not in session:
session["session_id"] = str(uuid.uuid4()) # Generate a new session ID
request.headers["Session-ID"] = session["session_id"]


@app.route("/")
async def hello():
variant = await feature_manager.get_variant("Message")
return str(variant.configuration if variant else "No variant found")


app.run()
2 changes: 2 additions & 0 deletions samples/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ featuremanagement
azure-appconfiguration-provider
azure-monitor-opentelemetry
azure-monitor-events-extension
quart
azure-identity
2 changes: 1 addition & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
azure-monitor-opentelemetry
azure-monitor-events-extension
azure-monitor-events-extension
52 changes: 51 additions & 1 deletion tests/test_send_telemetry_appinsights.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import logging
from unittest.mock import patch
import pytest
from featuremanagement import EvaluationEvent, FeatureFlag, Variant, VariantAssignmentReason
from featuremanagement import EvaluationEvent, FeatureFlag, Variant, VariantAssignmentReason, TargetingContext
import featuremanagement.azuremonitor._send_telemetry
from featuremanagement.azuremonitor import TargetingSpanProcessor


@pytest.mark.usefixtures("caplog")
class TestSendTelemetryAppinsights:

user_id = None

def test_send_telemetry_appinsights(self):
feature_flag = FeatureFlag.convert_from_json(
{
Expand Down Expand Up @@ -211,3 +215,49 @@ def test_send_telemetry_appinsights_allocation(self):
assert mock_track_event.call_args[0][1]["VariantAssignmentReason"] == "Percentile"
assert mock_track_event.call_args[0][1]["VariantAssignmentPercentage"] == "25"
assert "DefaultWhenEnabled" not in mock_track_event.call_args[0][1]

def test_targeting_span_processor(self, caplog):
processor = TargetingSpanProcessor()
processor.on_start(None)
assert "" in caplog.text
caplog.clear()

processor = TargetingSpanProcessor(targeting_context_accessor="not callable")
processor.on_start(None)
assert "" in caplog.text
caplog.clear()

processor = TargetingSpanProcessor(targeting_context_accessor=self.bad_targeting_context_accessor)
processor.on_start(None)
assert (
"targeting_context_accessor did not return a TargetingContext. Received type <class 'str'>." in caplog.text
)
caplog.clear()

processor = TargetingSpanProcessor(targeting_context_accessor=self.async_targeting_context_accessor)
processor.on_start(None)
assert "Async targeting_context_accessor is not supported." in caplog.text
caplog.clear()

processor = TargetingSpanProcessor(targeting_context_accessor=self.accessor_callback)
logging.getLogger().setLevel(logging.DEBUG)
processor.on_start(None)
assert "TargetingContext does not have a user ID." in caplog.text
caplog.clear()

with patch("opentelemetry.sdk.trace.Span") as mock_span:
self.user_id = "test_user"
processor.on_start(mock_span)
assert mock_span.set_attribute.call_args[0][0] == "TargetingId"
assert mock_span.set_attribute.call_args[0][1] == "test_user"

self.user_id = None

def bad_targeting_context_accessor(self):
return "not targeting context"

async def async_targeting_context_accessor(self):
return TargetingContext(user_id=self.user_id)

def accessor_callback(self):
return TargetingContext(user_id=self.user_id)