Skip to content

[FSSDK-11166] update: implement CMAB service #455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions optimizely/cmab/cmab_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2025 Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import json
import hashlib

from typing import Optional, List, TypedDict
from optimizely.cmab.cmab_client import DefaultCmabClient
from optimizely.odp.lru_cache import LRUCache
from optimizely.optimizely_user_context import OptimizelyUserContext, UserAttributes
from optimizely.project_config import ProjectConfig
from optimizely.decision.optimizely_decide_option import OptimizelyDecideOption
from optimizely import logger as _logging


class CmabDecision(TypedDict):
variation_id: str
cmab_uuid: str


class CmabCacheValue(TypedDict):
attributes_hash: str
variation_id: str
cmab_uuid: str


class DefaultCmabService:
def __init__(self, cmab_cache: LRUCache[str, CmabCacheValue],
cmab_client: DefaultCmabClient, logger: Optional[_logging.Logger] = None):
self.cmab_cache = cmab_cache
self.cmab_client = cmab_client
self.logger = logger

def get_decision(self, project_config: ProjectConfig, user_context: OptimizelyUserContext,
rule_id: str, options: List[str]) -> CmabDecision:

filtered_attributes = self._filter_attributes(project_config, user_context, rule_id)

if OptimizelyDecideOption.IGNORE_CMAB_CACHE in options:
return self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)

if OptimizelyDecideOption.RESET_CMAB_CACHE in options:
self.cmab_cache.reset()

cache_key = self._get_cache_key(user_context.user_id, rule_id)

if OptimizelyDecideOption.INVALIDATE_USER_CMAB_CACHE in options:
self.cmab_cache.remove(cache_key)

cached_value = self.cmab_cache.lookup(cache_key)

attributes_hash = self._hash_attributes(filtered_attributes)

if cached_value:
if cached_value['attributes_hash'] == attributes_hash:
return CmabDecision(variation_id=cached_value['variation_id'], cmab_uuid=cached_value['cmab_uuid'])
else:
self.cmab_cache.remove(cache_key)

cmab_decision = self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
self.cmab_cache.save(cache_key, {
'attributes_hash': attributes_hash,
'variation_id': cmab_decision['variation_id'],
'cmab_uuid': cmab_decision['cmab_uuid'],
})
return cmab_decision

def _fetch_decision(self, rule_id: str, user_id: str, attributes: UserAttributes) -> CmabDecision:
cmab_uuid = str(uuid.uuid4())
variation_id = self.cmab_client.fetch_decision(rule_id, user_id, attributes, cmab_uuid)
cmab_decision = CmabDecision(variation_id=variation_id, cmab_uuid=cmab_uuid)
return cmab_decision

def _filter_attributes(self, project_config: ProjectConfig,
user_context: OptimizelyUserContext, rule_id: str) -> UserAttributes:
user_attributes = user_context.get_user_attributes()
filtered_user_attributes = UserAttributes({})

experiment = project_config.experiment_id_map.get(rule_id)
if not experiment or not experiment.cmab:
return filtered_user_attributes

cmab_attribute_ids = experiment.cmab['attributeIds']
for attribute_id in cmab_attribute_ids:
attribute = project_config.attribute_id_map.get(attribute_id)
if attribute and attribute.key in user_attributes:
filtered_user_attributes[attribute.key] = user_attributes[attribute.key]

return filtered_user_attributes

def _get_cache_key(self, user_id: str, rule_id: str) -> str:
return f"{len(user_id)}-{user_id}-{rule_id}"

def _hash_attributes(self, attributes: UserAttributes) -> str:
sorted_attrs = json.dumps(attributes, sort_keys=True)
return hashlib.md5(sorted_attrs.encode()).hexdigest()
3 changes: 3 additions & 0 deletions optimizely/decision/optimizely_decide_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ class OptimizelyDecideOption:
IGNORE_USER_PROFILE_SERVICE: Final = 'IGNORE_USER_PROFILE_SERVICE'
INCLUDE_REASONS: Final = 'INCLUDE_REASONS'
EXCLUDE_VARIABLES: Final = 'EXCLUDE_VARIABLES'
IGNORE_CMAB_CACHE: Final = "IGNORE_CMAB_CACHE"
RESET_CMAB_CACHE: Final = "RESET_CMAB_CACHE"
INVALIDATE_USER_CMAB_CACHE: Final = "INVALIDATE_USER_CMAB_CACHE"
3 changes: 3 additions & 0 deletions optimizely/project_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def __init__(self, datafile: str | bytes, logger: Logger, error_handler: Any):
self.attribute_id_to_key_map: dict[str, str] = {}
for attribute in self.attributes:
self.attribute_id_to_key_map[attribute['id']] = attribute['key']
self.attribute_id_map: dict[str, entities.Attribute] = self._generate_key_map(
self.attributes, 'id', entities.Attribute
)
self.audience_id_map: dict[str, entities.Audience] = self._generate_key_map(
self.audiences, 'id', entities.Audience
)
Expand Down
12 changes: 12 additions & 0 deletions tests/test_cmab_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# Copyright 2025, Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import json
from unittest.mock import MagicMock, patch, call
Expand Down
187 changes: 187 additions & 0 deletions tests/test_cmab_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright 2025, Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from unittest.mock import MagicMock
from optimizely.cmab.cmab_service import DefaultCmabService
from optimizely.optimizely_user_context import OptimizelyUserContext
from optimizely.decision.optimizely_decide_option import OptimizelyDecideOption
from optimizely.odp.lru_cache import LRUCache
from optimizely.cmab.cmab_client import DefaultCmabClient
from optimizely.project_config import ProjectConfig
from optimizely.entities import Attribute


class TestDefaultCmabService(unittest.TestCase):
def setUp(self):
self.mock_cmab_cache = MagicMock(spec=LRUCache)
self.mock_cmab_client = MagicMock(spec=DefaultCmabClient)
self.mock_logger = MagicMock()

self.cmab_service = DefaultCmabService(
cmab_cache=self.mock_cmab_cache,
cmab_client=self.mock_cmab_client,
logger=self.mock_logger
)

self.mock_project_config = MagicMock(spec=ProjectConfig)
self.mock_user_context = MagicMock(spec=OptimizelyUserContext)
self.mock_user_context.user_id = 'user123'
self.mock_user_context.get_user_attributes.return_value = {'age': 25, 'location': 'USA'}

# Setup mock experiment and attribute mapping
self.mock_project_config.experiment_id_map = {
'exp1': MagicMock(cmab={'attributeIds': ['66', '77']})
}
attr1 = Attribute(id="66", key="age")
attr2 = Attribute(id="77", key="location")
self.mock_project_config.attribute_id_map = {
"66": attr1,
"77": attr2
}

def test_returns_decision_from_cache_when_valid(self):
expected_key = self.cmab_service._get_cache_key("user123", "exp1")
expected_attributes = {"age": 25, "location": "USA"}
expected_hash = self.cmab_service._hash_attributes(expected_attributes)

self.mock_cmab_cache.lookup.return_value = {
"attributes_hash": expected_hash,
"variation_id": "varA",
"cmab_uuid": "uuid-123"
}

decision = self.cmab_service.get_decision(
self.mock_project_config, self.mock_user_context, "exp1", []
)

self.mock_cmab_cache.lookup.assert_called_once_with(expected_key)
self.assertEqual(decision["variation_id"], "varA")
self.assertEqual(decision["cmab_uuid"], "uuid-123")

def test_ignores_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varB"
expected_attributes = {"age": 25, "location": "USA"}

decision = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
[OptimizelyDecideOption.IGNORE_CMAB_CACHE]
)

self.assertEqual(decision["variation_id"], "varB")
self.assertIn('cmab_uuid', decision)
self.mock_cmab_client.fetch_decision.assert_called_once_with(
"exp1",
self.mock_user_context.user_id,
expected_attributes,
decision["cmab_uuid"]
)

def test_invalidates_user_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varC"
self.mock_cmab_cache.lookup.return_value = None
self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
[OptimizelyDecideOption.INVALIDATE_USER_CMAB_CACHE]
)

key = self.cmab_service._get_cache_key("user123", "exp1")
self.mock_cmab_cache.remove.assert_called_with(key)
self.mock_cmab_cache.remove.assert_called_once()

def test_resets_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varD"

decision = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
[OptimizelyDecideOption.RESET_CMAB_CACHE]
)

self.mock_cmab_cache.reset.assert_called_once()
self.assertEqual(decision["variation_id"], "varD")
self.assertIn('cmab_uuid', decision)

def test_new_decision_when_hash_changes(self):
self.mock_cmab_cache.lookup.return_value = {
"attributes_hash": "old_hash",
"variation_id": "varA",
"cmab_uuid": "uuid-123"
}
self.mock_cmab_client.fetch_decision.return_value = "varE"

expected_attribute = {"age": 25, "location": "USA"}
expected_hash = self.cmab_service._hash_attributes(expected_attribute)
expected_key = self.cmab_service._get_cache_key("user123", "exp1")

decision = self.cmab_service.get_decision(self.mock_project_config, self.mock_user_context, "exp1", [])
self.mock_cmab_cache.remove.assert_called_once_with(expected_key)
self.mock_cmab_cache.save.assert_called_once_with(
expected_key,
{
"cmab_uuid": decision["cmab_uuid"],
"variation_id": decision["variation_id"],
"attributes_hash": expected_hash
}
)
self.assertEqual(decision["variation_id"], "varE")
self.mock_cmab_client.fetch_decision.assert_called_once_with(
"exp1",
self.mock_user_context.user_id,
expected_attribute,
decision["cmab_uuid"]
)

def test_filter_attributes_returns_correct_subset(self):
filtered = self.cmab_service._filter_attributes(self.mock_project_config, self.mock_user_context, "exp1")
self.assertEqual(filtered["age"], 25)
self.assertEqual(filtered["location"], "USA")

def test_filter_attributes_empty_when_no_cmab(self):
self.mock_project_config.experiment_id_map["exp1"].cmab = None
filtered = self.cmab_service._filter_attributes(self.mock_project_config, self.mock_user_context, "exp1")
self.assertEqual(filtered, {})

def test_hash_attributes_produces_stable_output(self):
attrs = {"b": 2, "a": 1}
hash1 = self.cmab_service._hash_attributes(attrs)
hash2 = self.cmab_service._hash_attributes({"a": 1, "b": 2})
self.assertEqual(hash1, hash2)

def test_only_cmab_attributes_passed_to_client(self):
self.mock_user_context.get_user_attributes.return_value = {
'age': 25,
'location': 'USA',
'extra_attr': 'value', # This shouldn't be passed to CMAB
'another_extra': 123 # This shouldn't be passed to CMAB
}
self.mock_cmab_client.fetch_decision.return_value = "varF"

decision = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
[OptimizelyDecideOption.IGNORE_CMAB_CACHE]
)

# Verify only age and location are passed (attributes configured in setUp)
self.mock_cmab_client.fetch_decision.assert_called_once_with(
"exp1",
self.mock_user_context.user_id,
{"age": 25, "location": "USA"},
decision["cmab_uuid"]
)