Skip to content

Commit 21dd7ac

Browse files
committed
remote sampling - rules caching and rules matching
1 parent 543792f commit 21dd7ac

File tree

13 files changed

+949
-28
lines changed

13 files changed

+949
-28
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_aws_xray_sampling_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212

1313
class _AwsXRaySamplingClient:
14-
def __init__(self, endpoint=None, log_level=None):
14+
def __init__(self, endpoint: str = None, log_level: str = None):
1515
# Override default log level
1616
if log_level is not None:
1717
_logger.setLevel(log_level)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Optional, Sequence
4+
5+
from opentelemetry.context import Context
6+
from opentelemetry.sdk.trace.sampling import ALWAYS_ON, Sampler, SamplingResult, TraceIdRatioBased
7+
from opentelemetry.trace import Link, SpanKind
8+
from opentelemetry.trace.span import TraceState
9+
from opentelemetry.util.types import Attributes
10+
11+
12+
class _FallbackSampler(Sampler):
13+
def __init__(self):
14+
# TODO: Add Reservoir sampler
15+
# pylint: disable=unused-private-member
16+
self.__fixed_rate_sampler = TraceIdRatioBased(0.05)
17+
18+
# pylint: disable=no-self-use
19+
def should_sample(
20+
self,
21+
parent_context: Optional[Context],
22+
trace_id: int,
23+
name: str,
24+
kind: SpanKind = None,
25+
attributes: Attributes = None,
26+
links: Sequence[Link] = None,
27+
trace_state: TraceState = None,
28+
) -> SamplingResult:
29+
# TODO: add reservoir + fixed rate sampling
30+
return ALWAYS_ON.should_sample(
31+
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
32+
)
33+
34+
# pylint: disable=no-self-use
35+
def get_description(self) -> str:
36+
description = (
37+
"FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}"
38+
)
39+
return description
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import re
4+
5+
from opentelemetry.semconv.resource import CloudPlatformValues
6+
from opentelemetry.util.types import Attributes
7+
8+
cloud_platform_mapping = {
9+
CloudPlatformValues.AWS_LAMBDA.value: "AWS::Lambda::Function",
10+
CloudPlatformValues.AWS_ELASTIC_BEANSTALK.value: "AWS::ElasticBeanstalk::Environment",
11+
CloudPlatformValues.AWS_EC2.value: "AWS::EC2::Instance",
12+
CloudPlatformValues.AWS_ECS.value: "AWS::ECS::Container",
13+
CloudPlatformValues.AWS_EKS.value: "AWS::EKS::Container",
14+
}
15+
16+
17+
class _Matcher:
18+
@staticmethod
19+
def wild_card_match(text: str = None, pattern: str = None) -> bool:
20+
if pattern == "*":
21+
return True
22+
if text is None or pattern is None:
23+
return False
24+
if len(pattern) == 0:
25+
return len(text) == 0
26+
for char in pattern:
27+
if char in ("*", "?"):
28+
return re.fullmatch(_Matcher.to_regex_pattern(pattern), text) is not None
29+
return pattern == text
30+
31+
@staticmethod
32+
def to_regex_pattern(rule_pattern: str) -> str:
33+
token_start = -1
34+
regex_pattern = ""
35+
for index, char in enumerate(rule_pattern):
36+
char = rule_pattern[index]
37+
if char in ("*", "?"):
38+
if token_start != -1:
39+
regex_pattern += re.escape(rule_pattern[token_start:index])
40+
token_start = -1
41+
if char == "*":
42+
regex_pattern += ".*"
43+
else:
44+
regex_pattern += "."
45+
else:
46+
if token_start == -1:
47+
token_start = index
48+
if token_start != -1:
49+
regex_pattern += re.escape(rule_pattern[token_start:])
50+
return regex_pattern
51+
52+
@staticmethod
53+
def attribute_match(attributes: Attributes = None, rule_attributes: dict = None) -> bool:
54+
if rule_attributes is None or len(rule_attributes) == 0:
55+
return True
56+
if attributes is None or len(attributes) == 0 or len(rule_attributes) > len(attributes):
57+
return False
58+
59+
matched_count = 0
60+
for key, val in attributes.items():
61+
text_to_match = val
62+
pattern = rule_attributes.get(key, None)
63+
if pattern is None:
64+
continue
65+
if _Matcher.wild_card_match(text_to_match, pattern):
66+
matched_count += 1
67+
return matched_count == len(rule_attributes)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Optional, Sequence
4+
5+
from amazon.opentelemetry.distro.sampler._matcher import _Matcher, cloud_platform_mapping
6+
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule
7+
from opentelemetry.context import Context
8+
from opentelemetry.sdk.resources import Resource
9+
from opentelemetry.sdk.trace.sampling import ALWAYS_ON, SamplingResult
10+
from opentelemetry.semconv.resource import ResourceAttributes
11+
from opentelemetry.semconv.trace import SpanAttributes
12+
from opentelemetry.trace import Link, SpanKind
13+
from opentelemetry.trace.span import TraceState
14+
from opentelemetry.util.types import Attributes
15+
16+
17+
class _Rule:
18+
def __init__(self, sampling_rule: _SamplingRule):
19+
self.sampling_rule = sampling_rule
20+
# TODO add self.next_target_fetch_time from maybe time.process_time() or cache's datetime object
21+
# TODO add statistics
22+
# TODO change to rate limiter given rate, add fixed rate sampler
23+
self.reservoir_sampler = ALWAYS_ON
24+
# self.fixed_rate_sampler = None
25+
# TODO add clientId
26+
27+
def should_sample(
28+
self,
29+
parent_context: Optional[Context],
30+
trace_id: int,
31+
name: str,
32+
kind: SpanKind = None,
33+
attributes: Attributes = None,
34+
links: Sequence[Link] = None,
35+
trace_state: TraceState = None,
36+
) -> SamplingResult:
37+
return self.reservoir_sampler.should_sample(
38+
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
39+
)
40+
41+
def matches(self, resource: Resource, attributes: Attributes) -> bool:
42+
http_target = None
43+
http_url = None
44+
http_method = None
45+
http_host = None
46+
service_name = None
47+
48+
if attributes is not None:
49+
http_target = attributes.get(SpanAttributes.HTTP_TARGET, None)
50+
http_method = attributes.get(SpanAttributes.HTTP_METHOD, None)
51+
http_url = attributes.get(SpanAttributes.HTTP_URL, None)
52+
http_host = attributes.get(SpanAttributes.HTTP_HOST, None)
53+
# NOTE: The above span attribute keys are deprecated in favor of:
54+
# URL_PATH/URL_QUERY, HTTP_REQUEST_METHOD, URL_FULL, SERVER_ADDRESS/SERVER_PORT
55+
# For now, the old attribute keys are kept for consistency with other centralized samplers
56+
57+
# Resource shouldn't be none as it should default to empty resource
58+
if resource is not None:
59+
service_name = resource.attributes.get(ResourceAttributes.SERVICE_NAME, "")
60+
61+
# target may be in url
62+
if http_target is None and http_url is not None:
63+
scheme_end_index = http_url.find("://")
64+
# Per spec, http.url is always populated with scheme://host/target. If scheme doesn't
65+
# match, assume it's bad instrumentation and ignore.
66+
if scheme_end_index > -1:
67+
path_index = http_url.find("/", scheme_end_index + len("://"))
68+
if path_index == -1:
69+
http_target = "/"
70+
else:
71+
http_target = http_url[path_index:]
72+
73+
return (
74+
_Matcher.attribute_match(attributes, self.sampling_rule.Attributes)
75+
and _Matcher.wild_card_match(http_target, self.sampling_rule.URLPath)
76+
and _Matcher.wild_card_match(http_method, self.sampling_rule.HTTPMethod)
77+
and _Matcher.wild_card_match(http_host, self.sampling_rule.Host)
78+
and _Matcher.wild_card_match(service_name, self.sampling_rule.ServiceName)
79+
and _Matcher.wild_card_match(self.get_service_type(resource), self.sampling_rule.ServiceType)
80+
)
81+
82+
# pylint: disable=no-self-use
83+
def get_service_type(self, resource: Resource) -> str:
84+
if resource is None:
85+
return ""
86+
87+
cloud_platform = resource.attributes.get(ResourceAttributes.CLOUD_PLATFORM, None)
88+
if cloud_platform is None:
89+
return ""
90+
91+
return cloud_platform_mapping.get(cloud_platform, "")
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import copy
4+
import datetime
5+
from logging import getLogger
6+
from threading import Lock
7+
from typing import Optional, Sequence
8+
9+
from amazon.opentelemetry.distro.sampler._fallback_sampler import _FallbackSampler
10+
from amazon.opentelemetry.distro.sampler._rule import _Rule
11+
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule
12+
from opentelemetry.context import Context
13+
from opentelemetry.sdk.resources import Resource
14+
from opentelemetry.sdk.trace.sampling import SamplingResult
15+
from opentelemetry.trace import Link, SpanKind
16+
from opentelemetry.trace.span import TraceState
17+
from opentelemetry.util.types import Attributes
18+
19+
_logger = getLogger(__name__)
20+
21+
CACHE_TTL_SECONDS = 3600
22+
23+
24+
class _RuleCache:
25+
rules: [_Rule] = []
26+
27+
def __init__(self, resource: Resource, fallback_sampler: _FallbackSampler, date_time: datetime, lock: Lock):
28+
self.__cache_lock = lock
29+
self.__resource = resource
30+
self._fallback_sampler = fallback_sampler
31+
self._date_time = date_time
32+
self._last_modified = self._date_time.datetime.now()
33+
34+
def should_sample(
35+
self,
36+
parent_context: Optional[Context],
37+
trace_id: int,
38+
name: str,
39+
kind: SpanKind = None,
40+
attributes: Attributes = None,
41+
links: Sequence[Link] = None,
42+
trace_state: TraceState = None,
43+
) -> SamplingResult:
44+
for rule in self.rules:
45+
if rule.matches(self.__resource, attributes):
46+
return rule.should_sample(
47+
parent_context,
48+
trace_id,
49+
name,
50+
kind=kind,
51+
attributes=attributes,
52+
links=links,
53+
trace_state=trace_state,
54+
)
55+
56+
return self._fallback_sampler.should_sample(
57+
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
58+
)
59+
60+
def update_sampling_rules(self, new_sampling_rules: [_SamplingRule]) -> None:
61+
new_sampling_rules.sort()
62+
temp_rules = []
63+
for sampling_rule in new_sampling_rules:
64+
if sampling_rule.RuleName == "":
65+
_logger.info("sampling rule without rule name is not supported")
66+
continue
67+
if sampling_rule.Version != 1:
68+
_logger.info("sampling rule without Version 1 is not supported: RuleName: %s", sampling_rule.RuleName)
69+
continue
70+
temp_rules.append(_Rule(copy.deepcopy(sampling_rule)))
71+
72+
self.__cache_lock.acquire()
73+
74+
# map list of rules by each rule's sampling_rule name
75+
rule_map = {rule.sampling_rule.RuleName: rule for rule in self.rules}
76+
77+
# If a sampling rule has not changed, keep its respective rule in the cache.
78+
for index, new_rule in enumerate(temp_rules):
79+
rule_name_to_check = new_rule.sampling_rule.RuleName
80+
if rule_name_to_check in rule_map:
81+
previous_rule = rule_map[rule_name_to_check]
82+
if new_rule.sampling_rule == previous_rule.sampling_rule:
83+
temp_rules[index] = previous_rule
84+
self.rules = temp_rules
85+
self._last_modified = datetime.datetime.now()
86+
87+
self.__cache_lock.release()
88+
89+
def expired(self) -> bool:
90+
self.__cache_lock.acquire()
91+
try:
92+
return datetime.datetime.now() > self._last_modified + datetime.timedelta(seconds=CACHE_TTL_SECONDS)
93+
finally:
94+
self.__cache_lock.release()

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_sampling_rule.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@ class _SamplingRule:
88
def __init__(
99
self,
1010
Attributes: dict = None,
11-
FixedRate=None,
12-
HTTPMethod=None,
13-
Host=None,
14-
Priority=None,
15-
ReservoirSize=None,
16-
ResourceARN=None,
17-
RuleARN=None,
18-
RuleName=None,
19-
ServiceName=None,
20-
ServiceType=None,
21-
URLPath=None,
22-
Version=None,
11+
FixedRate: float = None,
12+
HTTPMethod: str = None,
13+
Host: str = None,
14+
Priority: int = None,
15+
ReservoirSize: int = None,
16+
ResourceARN: str = None,
17+
RuleARN: str = None,
18+
RuleName: str = None,
19+
ServiceName: str = None,
20+
ServiceType: str = None,
21+
URLPath: str = None,
22+
Version: int = None,
2323
):
2424
self.Attributes = Attributes if Attributes is not None else {}
2525
self.FixedRate = FixedRate if FixedRate is not None else 0.0
@@ -35,3 +35,29 @@ def __init__(
3535
self.ServiceType = ServiceType if ServiceType is not None else ""
3636
self.URLPath = URLPath if URLPath is not None else ""
3737
self.Version = Version if Version is not None else 0
38+
39+
def __lt__(self, other) -> bool:
40+
if self.Priority == other.Priority:
41+
# String order priority example:
42+
# "A","Abc","a","ab","abc","abcdef"
43+
return self.RuleName < other.RuleName
44+
return self.Priority < other.Priority
45+
46+
def __eq__(self, other: object) -> bool:
47+
if not isinstance(other, _SamplingRule):
48+
return False
49+
return (
50+
self.FixedRate == other.FixedRate
51+
and self.HTTPMethod == other.HTTPMethod
52+
and self.Host == other.Host
53+
and self.Priority == other.Priority
54+
and self.ReservoirSize == other.ReservoirSize
55+
and self.ResourceARN == other.ResourceARN
56+
and self.RuleARN == other.RuleARN
57+
and self.RuleName == other.RuleName
58+
and self.ServiceName == other.ServiceName
59+
and self.ServiceType == other.ServiceType
60+
and self.URLPath == other.URLPath
61+
and self.Version == other.Version
62+
and self.Attributes == other.Attributes
63+
)

0 commit comments

Comments
 (0)