Skip to content

Commit 2267e09

Browse files
committed
addressing comments
1 parent 21dd7ac commit 2267e09

File tree

6 files changed

+126
-112
lines changed

6 files changed

+126
-112
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_rule_cache.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import copy
43
import datetime
54
from logging import getLogger
65
from threading import Lock
76
from typing import Optional, Sequence
87

98
from amazon.opentelemetry.distro.sampler._fallback_sampler import _FallbackSampler
10-
from amazon.opentelemetry.distro.sampler._rule import _Rule
119
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule
10+
from amazon.opentelemetry.distro.sampler._sampling_rule_applier import _SamplingRuleApplier
1211
from opentelemetry.context import Context
1312
from opentelemetry.sdk.resources import Resource
1413
from opentelemetry.sdk.trace.sampling import SamplingResult
@@ -22,9 +21,8 @@
2221

2322

2423
class _RuleCache:
25-
rules: [_Rule] = []
26-
2724
def __init__(self, resource: Resource, fallback_sampler: _FallbackSampler, date_time: datetime, lock: Lock):
25+
self.__rule_appliers: [_SamplingRuleApplier] = []
2826
self.__cache_lock = lock
2927
self.__resource = resource
3028
self._fallback_sampler = fallback_sampler
@@ -41,9 +39,9 @@ def should_sample(
4139
links: Sequence[Link] = None,
4240
trace_state: TraceState = None,
4341
) -> SamplingResult:
44-
for rule in self.rules:
45-
if rule.matches(self.__resource, attributes):
46-
return rule.should_sample(
42+
for rule_applier in self.__rule_appliers:
43+
if rule_applier.matches(self.__resource, attributes):
44+
return rule_applier.should_sample(
4745
parent_context,
4846
trace_id,
4947
name,
@@ -59,29 +57,29 @@ def should_sample(
5957

6058
def update_sampling_rules(self, new_sampling_rules: [_SamplingRule]) -> None:
6159
new_sampling_rules.sort()
62-
temp_rules = []
60+
temp_rule_appliers = []
6361
for sampling_rule in new_sampling_rules:
6462
if sampling_rule.RuleName == "":
65-
_logger.info("sampling rule without rule name is not supported")
63+
_logger.debug("sampling rule without rule name is not supported")
6664
continue
6765
if sampling_rule.Version != 1:
68-
_logger.info("sampling rule without Version 1 is not supported: RuleName: %s", sampling_rule.RuleName)
66+
_logger.debug("sampling rule without Version 1 is not supported: RuleName: %s", sampling_rule.RuleName)
6967
continue
70-
temp_rules.append(_Rule(copy.deepcopy(sampling_rule)))
68+
temp_rule_appliers.append(_SamplingRuleApplier(sampling_rule))
7169

7270
self.__cache_lock.acquire()
7371

74-
# map list of rules by each rule's sampling_rule name
75-
rule_map = {rule.sampling_rule.RuleName: rule for rule in self.rules}
72+
# map list of rule appliers by each applier's sampling_rule name
73+
rule_applier_map = {rule.sampling_rule.RuleName: rule for rule in self.__rule_appliers}
7674

77-
# If a sampling rule has not changed, keep its respective rule in the cache.
78-
for index, new_rule in enumerate(temp_rules):
79-
rule_name_to_check = new_rule.sampling_rule.RuleName
80-
if rule_name_to_check in rule_map:
81-
previous_rule = rule_map[rule_name_to_check]
82-
if new_rule.sampling_rule == previous_rule.sampling_rule:
83-
temp_rules[index] = previous_rule
84-
self.rules = temp_rules
75+
# If a sampling rule has not changed, keep its respective applier in the cache.
76+
for index, new_applier in enumerate(temp_rule_appliers):
77+
rule_name_to_check = new_applier.sampling_rule.RuleName
78+
if rule_name_to_check in rule_applier_map:
79+
old_applier = rule_applier_map[rule_name_to_check]
80+
if new_applier.sampling_rule == old_applier.sampling_rule:
81+
temp_rule_appliers[index] = old_applier
82+
self.__rule_appliers = temp_rule_appliers
8583
self._last_modified = datetime.datetime.now()
8684

8785
self.__cache_lock.release()
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
from typing import Optional, Sequence
4+
from urllib.parse import urlparse
45

56
from amazon.opentelemetry.distro.sampler._matcher import _Matcher, cloud_platform_mapping
67
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule
78
from opentelemetry.context import Context
89
from opentelemetry.sdk.resources import Resource
910
from opentelemetry.sdk.trace.sampling import ALWAYS_ON, SamplingResult
10-
from opentelemetry.semconv.resource import ResourceAttributes
11+
from opentelemetry.semconv.resource import CloudPlatformValues, ResourceAttributes
1112
from opentelemetry.semconv.trace import SpanAttributes
1213
from opentelemetry.trace import Link, SpanKind
1314
from opentelemetry.trace.span import TraceState
1415
from opentelemetry.util.types import Attributes
1516

1617

17-
class _Rule:
18+
class _SamplingRuleApplier:
1819
def __init__(self, sampling_rule: _SamplingRule):
1920
self.sampling_rule = sampling_rule
2021
# TODO add self.next_target_fetch_time from maybe time.process_time() or cache's datetime object
@@ -39,48 +40,49 @@ def should_sample(
3940
)
4041

4142
def matches(self, resource: Resource, attributes: Attributes) -> bool:
42-
http_target = None
43-
http_url = None
44-
http_method = None
45-
http_host = None
43+
url_path = None
44+
url_full = None
45+
http_request_method = None
46+
server_address = None
4647
service_name = None
4748

4849
if attributes is not None:
49-
http_target = attributes.get(SpanAttributes.HTTP_TARGET, None)
50-
http_method = attributes.get(SpanAttributes.HTTP_METHOD, None)
51-
http_url = attributes.get(SpanAttributes.HTTP_URL, None)
52-
http_host = attributes.get(SpanAttributes.HTTP_HOST, None)
53-
# NOTE: The above span attribute keys are deprecated in favor of:
54-
# URL_PATH/URL_QUERY, HTTP_REQUEST_METHOD, URL_FULL, SERVER_ADDRESS/SERVER_PORT
55-
# For now, the old attribute keys are kept for consistency with other centralized samplers
50+
url_path = attributes.get(SpanAttributes.URL_PATH, None)
51+
url_full = attributes.get(SpanAttributes.URL_FULL, None)
52+
http_request_method = attributes.get(SpanAttributes.HTTP_REQUEST_METHOD, None)
53+
server_address = attributes.get(SpanAttributes.SERVER_ADDRESS, None)
5654

5755
# Resource shouldn't be none as it should default to empty resource
5856
if resource is not None:
5957
service_name = resource.attributes.get(ResourceAttributes.SERVICE_NAME, "")
6058

6159
# target may be in url
62-
if http_target is None and http_url is not None:
63-
scheme_end_index = http_url.find("://")
64-
# Per spec, http.url is always populated with scheme://host/target. If scheme doesn't
65-
# match, assume it's bad instrumentation and ignore.
60+
if url_path is None and url_full is not None:
61+
scheme_end_index = url_full.find("://")
62+
# For network calls, URL usually has `scheme://host[:port][path][?query][#fragment]` format
63+
# Per spec, url.full is always populated with scheme://host/target.
64+
# If scheme doesn't match, assume it's bad instrumentation and ignore.
6665
if scheme_end_index > -1:
67-
path_index = http_url.find("/", scheme_end_index + len("://"))
68-
if path_index == -1:
69-
http_target = "/"
70-
else:
71-
http_target = http_url[path_index:]
66+
# urlparse("scheme://netloc/path;parameters?query#fragment")
67+
url_path = urlparse(url_full).path
68+
if url_path == "":
69+
url_path = "/"
70+
elif url_path is None and url_full is None:
71+
# When missing, the URL Path is assumed to be /
72+
url_path = "/"
7273

7374
return (
7475
_Matcher.attribute_match(attributes, self.sampling_rule.Attributes)
75-
and _Matcher.wild_card_match(http_target, self.sampling_rule.URLPath)
76-
and _Matcher.wild_card_match(http_method, self.sampling_rule.HTTPMethod)
77-
and _Matcher.wild_card_match(http_host, self.sampling_rule.Host)
76+
and _Matcher.wild_card_match(url_path, self.sampling_rule.URLPath)
77+
and _Matcher.wild_card_match(http_request_method, self.sampling_rule.HTTPMethod)
78+
and _Matcher.wild_card_match(server_address, self.sampling_rule.Host)
7879
and _Matcher.wild_card_match(service_name, self.sampling_rule.ServiceName)
79-
and _Matcher.wild_card_match(self.get_service_type(resource), self.sampling_rule.ServiceType)
80+
and _Matcher.wild_card_match(self.__get_service_type(resource), self.sampling_rule.ServiceType)
81+
and _Matcher.wild_card_match(self.__get_arn(resource, attributes), self.sampling_rule.ResourceARN)
8082
)
8183

8284
# pylint: disable=no-self-use
83-
def get_service_type(self, resource: Resource) -> str:
85+
def __get_service_type(self, resource: Resource) -> str:
8486
if resource is None:
8587
return ""
8688

@@ -89,3 +91,17 @@ def get_service_type(self, resource: Resource) -> str:
8991
return ""
9092

9193
return cloud_platform_mapping.get(cloud_platform, "")
94+
95+
# pylint: disable=no-self-use
96+
def __get_arn(self, resource: Resource, attributes: Attributes) -> str:
97+
if resource is not None:
98+
arn = resource.attributes.get(ResourceAttributes.AWS_ECS_CONTAINER_ARN, None)
99+
if arn is not None:
100+
return arn
101+
if attributes is not None and self.__get_service_type(resource=resource) == cloud_platform_mapping.get(
102+
CloudPlatformValues.AWS_LAMBDA.value
103+
):
104+
arn = attributes.get(SpanAttributes.CLOUD_RESOURCE_ID, None)
105+
if arn is not None:
106+
return arn
107+
return ""

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/aws_xray_remote_sampler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def should_sample(
9191
) -> SamplingResult:
9292

9393
if self.__rule_cache.expired():
94-
_logger.info("Rule cache is expired so using fallback sampling strategy")
94+
_logger.debug("Rule cache is expired so using fallback sampling strategy")
9595
return self.__fallback_sampler.should_sample(
9696
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
9797
)

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/sampler/test_rule_cache.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@
1212
class TestRuleCache(TestCase):
1313
def test_cache_update_rules_and_sorts_rules(self):
1414
cache = _RuleCache(None, None, datetime, Lock())
15-
self.assertTrue(len(cache.rules) == 0)
15+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 0)
1616

1717
rule1 = _SamplingRule(Priority=200, RuleName="only_one_rule", Version=1)
1818
rules = [rule1]
1919
cache.update_sampling_rules(rules)
20-
self.assertTrue(len(cache.rules) == 1)
20+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 1)
2121

2222
rule1 = _SamplingRule(Priority=200, RuleName="abcdef", Version=1)
2323
rule2 = _SamplingRule(Priority=100, RuleName="abc", Version=1)
@@ -28,13 +28,13 @@ def test_cache_update_rules_and_sorts_rules(self):
2828
rules = [rule1, rule2, rule3, rule4, rule5, rule6]
2929
cache.update_sampling_rules(rules)
3030

31-
self.assertTrue(len(cache.rules) == 6)
32-
self.assertEqual(cache.rules[0].sampling_rule.RuleName, "abcdef")
33-
self.assertEqual(cache.rules[1].sampling_rule.RuleName, "A")
34-
self.assertEqual(cache.rules[2].sampling_rule.RuleName, "Abc")
35-
self.assertEqual(cache.rules[3].sampling_rule.RuleName, "ab")
36-
self.assertEqual(cache.rules[4].sampling_rule.RuleName, "abc")
37-
self.assertEqual(cache.rules[5].sampling_rule.RuleName, "abcdef")
31+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 6)
32+
self.assertEqual(cache._RuleCache__rule_appliers[0].sampling_rule.RuleName, "abcdef")
33+
self.assertEqual(cache._RuleCache__rule_appliers[1].sampling_rule.RuleName, "A")
34+
self.assertEqual(cache._RuleCache__rule_appliers[2].sampling_rule.RuleName, "Abc")
35+
self.assertEqual(cache._RuleCache__rule_appliers[3].sampling_rule.RuleName, "ab")
36+
self.assertEqual(cache._RuleCache__rule_appliers[4].sampling_rule.RuleName, "abc")
37+
self.assertEqual(cache._RuleCache__rule_appliers[5].sampling_rule.RuleName, "abcdef")
3838

3939
def test_rule_cache_expiration_logic(self):
4040
dt = datetime
@@ -54,34 +54,34 @@ def test_update_cache_with_only_one_rule_changed(self):
5454
rules = [rule1, rule2, rule3]
5555
cache.update_sampling_rules(rules)
5656

57-
cache_rules_copy = cache.rules
57+
cache_rules_copy = cache._RuleCache__rule_appliers
5858

5959
new_rule3 = _SamplingRule(Priority=5, RuleName="Abc", Version=1)
6060
rules = [rule1, rule2, new_rule3]
6161
cache.update_sampling_rules(rules)
6262

63-
self.assertTrue(len(cache.rules) == 3)
64-
self.assertEqual(cache.rules[0].sampling_rule.RuleName, "abcdef")
65-
self.assertEqual(cache.rules[1].sampling_rule.RuleName, "Abc")
66-
self.assertEqual(cache.rules[2].sampling_rule.RuleName, "ab")
63+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 3)
64+
self.assertEqual(cache._RuleCache__rule_appliers[0].sampling_rule.RuleName, "abcdef")
65+
self.assertEqual(cache._RuleCache__rule_appliers[1].sampling_rule.RuleName, "Abc")
66+
self.assertEqual(cache._RuleCache__rule_appliers[2].sampling_rule.RuleName, "ab")
6767

6868
# Compare that only rule1 and rule2 objects have not changed due to new_rule3 even after sorting
69-
self.assertTrue(cache_rules_copy[0] is cache.rules[0])
70-
self.assertTrue(cache_rules_copy[1] is cache.rules[2])
71-
self.assertTrue(cache_rules_copy[2] is not cache.rules[1])
69+
self.assertTrue(cache_rules_copy[0] is cache._RuleCache__rule_appliers[0])
70+
self.assertTrue(cache_rules_copy[1] is cache._RuleCache__rule_appliers[2])
71+
self.assertTrue(cache_rules_copy[2] is not cache._RuleCache__rule_appliers[1])
7272

7373
def test_update_rules_removes_older_rule(self):
7474
cache = _RuleCache(None, None, datetime, Lock())
75-
self.assertTrue(len(cache.rules) == 0)
75+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 0)
7676

7777
rule1 = _SamplingRule(Priority=200, RuleName="first_rule", Version=1)
7878
rules = [rule1]
7979
cache.update_sampling_rules(rules)
80-
self.assertTrue(len(cache.rules) == 1)
81-
self.assertEqual(cache.rules[0].sampling_rule.RuleName, "first_rule")
80+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 1)
81+
self.assertEqual(cache._RuleCache__rule_appliers[0].sampling_rule.RuleName, "first_rule")
8282

8383
rule1 = _SamplingRule(Priority=200, RuleName="second_rule", Version=1)
8484
rules = [rule1]
8585
cache.update_sampling_rules(rules)
86-
self.assertTrue(len(cache.rules) == 1)
87-
self.assertEqual(cache.rules[0].sampling_rule.RuleName, "second_rule")
86+
self.assertTrue(len(cache._RuleCache__rule_appliers) == 1)
87+
self.assertEqual(cache._RuleCache__rule_appliers[0].sampling_rule.RuleName, "second_rule")

0 commit comments

Comments
 (0)