Skip to content

Commit 453a3d5

Browse files
authored
AWS X-Ray Remote Sampler Part 3 - rate limiter logic and get sampling targets (#55)
*Issue #, if available:* This is the 3rd and final part of the X-Ray Remote Sampler implementation for Python [See Part 2](#47) *Description of changes:* - Added logic to fetch sampling targets for each sampling rule applier. - The sampling targets are periodically fetched every 10 seconds by making the [GetSamplingTargets API call to X-Ray](https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingTargets.html). - The targets determine the reservoir quota and the rate at which a sampling rule applier will sample the requests. - Each rule applier keeps and updates a sampling statistics document which is required in `GetSamplingTargets` call to determine the next target - Added the rate limiting and fixed rate samplers to be used in each rule applier. - Together these sampler determine how many requests to sample every second and what percentage of additional requests to sample in that second. - The FallbackSampler is a combination of above samplers to sample 1 req/sec and 5% of additional requests in that second. *Testing:* Unit Tests and Remote Sampling Testbed ## **Testbed:** 1. Have XRay Daemon running or OTel collector with XRay Proxy Client setup running. Ensure AWS credentials used has only default rule with 5% sampling and 1 req/s 2. Checkout this PR Branch, and install `pip3 install aws-opentelemetry-distro/` 3. Download this Python Sample App: https://github.com/jj22ee/aws-otel-community/tree/python-sample/centralized-sampling-tests/sample-apps/python-flask Install python3 requirements.txt Replace the following: ``` ### ### Set sampler HERE ### ``` with: ``` from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler xray_sampler = AwsXRayRemoteSampler(resource, polling_interval=10) trace.set_tracer_provider(TracerProvider(sampler=xray_sampler)) ``` Run Python Sample app with `python3 app.py` 4. Download this repository: https://github.com/aws-observability/aws-otel-community/ Within directory `centralized-sampling-tests/` run: `./gradlew :integration-tests:run` Check that the tests have passed. TODO: wire in the remote sampler to the ADOT Python customizer in this PR or a new PR By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent ec3011e commit 453a3d5

25 files changed

+1428
-86
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_aws_xray_sampling_client.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import requests
77

88
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule
9+
from amazon.opentelemetry.distro.sampler._sampling_target import _SamplingTargetResponse
910

1011
_logger = getLogger(__name__)
1112

@@ -19,6 +20,7 @@ def __init__(self, endpoint: str = None, log_level: str = None):
1920
if endpoint is None:
2021
_logger.error("endpoint must be specified")
2122
self.__get_sampling_rules_endpoint = endpoint + "/GetSamplingRules"
23+
self.__get_sampling_targets_endpoint = endpoint + "/SamplingTargets"
2224

2325
def get_sampling_rules(self) -> [_SamplingRule]:
2426
sampling_rules = []
@@ -30,12 +32,11 @@ def get_sampling_rules(self) -> [_SamplingRule]:
3032
_logger.error("GetSamplingRules response is None")
3133
return []
3234
sampling_rules_response = xray_response.json()
33-
if "SamplingRuleRecords" not in sampling_rules_response:
35+
if sampling_rules_response is None or "SamplingRuleRecords" not in sampling_rules_response:
3436
_logger.error(
3537
"SamplingRuleRecords is missing in getSamplingRules response: %s", sampling_rules_response
3638
)
3739
return []
38-
3940
sampling_rules_records = sampling_rules_response["SamplingRuleRecords"]
4041
for record in sampling_rules_records:
4142
if "SamplingRule" not in record:
@@ -47,5 +48,43 @@ def get_sampling_rules(self) -> [_SamplingRule]:
4748
_logger.error("Request error occurred: %s", req_err)
4849
except json.JSONDecodeError as json_err:
4950
_logger.error("Error in decoding JSON response: %s", json_err)
51+
# pylint: disable=broad-exception-caught
52+
except Exception as err:
53+
_logger.error("Error occurred when attempting to fetch rules: %s", err)
5054

5155
return sampling_rules
56+
57+
def get_sampling_targets(self, statistics: [dict]) -> _SamplingTargetResponse:
58+
sampling_targets_response = _SamplingTargetResponse(
59+
LastRuleModification=None, SamplingTargetDocuments=None, UnprocessedStatistics=None
60+
)
61+
headers = {"content-type": "application/json"}
62+
try:
63+
xray_response = requests.post(
64+
url=self.__get_sampling_targets_endpoint,
65+
headers=headers,
66+
timeout=20,
67+
json={"SamplingStatisticsDocuments": statistics},
68+
)
69+
if xray_response is None:
70+
_logger.debug("GetSamplingTargets response is None. Unable to update targets.")
71+
return sampling_targets_response
72+
xray_response_json = xray_response.json()
73+
if (
74+
xray_response_json is None
75+
or "SamplingTargetDocuments" not in xray_response_json
76+
or "LastRuleModification" not in xray_response_json
77+
):
78+
_logger.debug("getSamplingTargets response is invalid. Unable to update targets.")
79+
return sampling_targets_response
80+
81+
sampling_targets_response = _SamplingTargetResponse(**xray_response_json)
82+
except requests.exceptions.RequestException as req_err:
83+
_logger.debug("Request error occurred: %s", req_err)
84+
except json.JSONDecodeError as json_err:
85+
_logger.debug("Error in decoding JSON response: %s", json_err)
86+
# pylint: disable=broad-exception-caught
87+
except Exception as err:
88+
_logger.debug("Error occurred when attempting to fetch targets: %s", err)
89+
90+
return sampling_targets_response
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import datetime
5+
6+
7+
class _Clock:
8+
def __init__(self):
9+
self.__datetime = datetime.datetime
10+
11+
def now(self) -> datetime.datetime:
12+
return self.__datetime.now()
13+
14+
# pylint: disable=no-self-use
15+
def from_timestamp(self, timestamp: float) -> datetime.datetime:
16+
return datetime.datetime.fromtimestamp(timestamp)
17+
18+
def time_delta(self, seconds: float) -> datetime.timedelta:
19+
return datetime.timedelta(seconds=seconds)
20+
21+
def max(self) -> datetime.datetime:
22+
return datetime.datetime.max

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_fallback_sampler.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from typing import Optional, Sequence
44

5+
from amazon.opentelemetry.distro.sampler._clock import _Clock
6+
from amazon.opentelemetry.distro.sampler._rate_limiting_sampler import _RateLimitingSampler
57
from opentelemetry.context import Context
6-
from opentelemetry.sdk.trace.sampling import ALWAYS_ON, Sampler, SamplingResult, TraceIdRatioBased
8+
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult, TraceIdRatioBased
79
from opentelemetry.trace import Link, SpanKind
810
from opentelemetry.trace.span import TraceState
911
from opentelemetry.util.types import Attributes
1012

1113

1214
class _FallbackSampler(Sampler):
13-
def __init__(self):
14-
# TODO: Add Reservoir sampler
15-
# pylint: disable=unused-private-member
15+
def __init__(self, clock: _Clock):
16+
self.__rate_limiting_sampler = _RateLimitingSampler(1, clock)
1617
self.__fixed_rate_sampler = TraceIdRatioBased(0.05)
1718

1819
# pylint: disable=no-self-use
@@ -26,8 +27,12 @@ def should_sample(
2627
links: Sequence[Link] = None,
2728
trace_state: TraceState = None,
2829
) -> SamplingResult:
29-
# TODO: add reservoir + fixed rate sampling
30-
return ALWAYS_ON.should_sample(
30+
sampling_result = self.__rate_limiting_sampler.should_sample(
31+
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
32+
)
33+
if sampling_result.decision is not Decision.DROP:
34+
return sampling_result
35+
return self.__fixed_rate_sampler.should_sample(
3136
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
3237
)
3338

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from decimal import Decimal
4+
from threading import Lock
5+
6+
from amazon.opentelemetry.distro.sampler._clock import _Clock
7+
8+
9+
class _RateLimiter:
10+
def __init__(self, max_balance_in_seconds: int, quota: int, clock: _Clock):
11+
# max_balance_in_seconds is usually 1
12+
# pylint: disable=invalid-name
13+
self.MAX_BALANCE_MILLIS = Decimal(max_balance_in_seconds * 1000.0)
14+
self._clock = clock
15+
16+
self._quota = Decimal(quota)
17+
self.__wallet_floor_millis = Decimal(self._clock.now().timestamp() * 1000.0)
18+
# current "wallet_balance" would be ceiling - floor
19+
20+
self.__lock = Lock()
21+
22+
def try_spend(self, cost: float) -> bool:
23+
if self._quota == 0:
24+
return False
25+
26+
quota_per_millis = self._quota / Decimal(1000.0)
27+
28+
# assume divide by zero not possible
29+
cost_in_millis = Decimal(cost) / quota_per_millis
30+
31+
with self.__lock:
32+
wallet_ceiling_millis = Decimal(self._clock.now().timestamp() * 1000.0)
33+
current_balance_millis = wallet_ceiling_millis - self.__wallet_floor_millis
34+
if current_balance_millis > self.MAX_BALANCE_MILLIS:
35+
current_balance_millis = self.MAX_BALANCE_MILLIS
36+
37+
pending_remaining_balance_millis = current_balance_millis - cost_in_millis
38+
if pending_remaining_balance_millis >= 0:
39+
self.__wallet_floor_millis = wallet_ceiling_millis - pending_remaining_balance_millis
40+
return True
41+
# No changes to the wallet state
42+
return False
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Optional, Sequence
4+
5+
from amazon.opentelemetry.distro.sampler._clock import _Clock
6+
from amazon.opentelemetry.distro.sampler._rate_limiter import _RateLimiter
7+
from opentelemetry.context import Context
8+
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
9+
from opentelemetry.trace import Link, SpanKind
10+
from opentelemetry.trace.span import TraceState
11+
from opentelemetry.util.types import Attributes
12+
13+
14+
class _RateLimitingSampler(Sampler):
15+
def __init__(self, quota: int, clock: _Clock):
16+
self.__quota = quota
17+
self.__reservoir = _RateLimiter(1, quota, clock)
18+
19+
# pylint: disable=no-self-use
20+
def should_sample(
21+
self,
22+
parent_context: Optional[Context],
23+
trace_id: int,
24+
name: str,
25+
kind: SpanKind = None,
26+
attributes: Attributes = None,
27+
links: Sequence[Link] = None,
28+
trace_state: TraceState = None,
29+
) -> SamplingResult:
30+
if self.__reservoir.try_spend(1):
31+
return SamplingResult(decision=Decision.RECORD_AND_SAMPLE, attributes=attributes, trace_state=trace_state)
32+
return SamplingResult(decision=Decision.DROP, attributes=attributes, trace_state=trace_state)
33+
34+
# pylint: disable=no-self-use
35+
def get_description(self) -> str:
36+
description = (
37+
"RateLimitingSampler{rate limiting sampling with sampling config of "
38+
+ self.__quota
39+
+ " req/sec and 0% of additional requests}"
40+
)
41+
return description

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_rule_cache.py

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import datetime
43
from logging import getLogger
54
from threading import Lock
6-
from typing import Optional, Sequence
5+
from typing import Dict, Optional, Sequence
76

7+
from amazon.opentelemetry.distro.sampler._clock import _Clock
88
from amazon.opentelemetry.distro.sampler._fallback_sampler import _FallbackSampler
99
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule
1010
from amazon.opentelemetry.distro.sampler._sampling_rule_applier import _SamplingRuleApplier
11+
from amazon.opentelemetry.distro.sampler._sampling_target import _SamplingTarget, _SamplingTargetResponse
1112
from opentelemetry.context import Context
1213
from opentelemetry.sdk.resources import Resource
1314
from opentelemetry.sdk.trace.sampling import SamplingResult
@@ -18,16 +19,20 @@
1819
_logger = getLogger(__name__)
1920

2021
CACHE_TTL_SECONDS = 3600
22+
DEFAULT_TARGET_POLLING_INTERVAL_SECONDS = 10
2123

2224

2325
class _RuleCache:
24-
def __init__(self, resource: Resource, fallback_sampler: _FallbackSampler, date_time: datetime, lock: Lock):
26+
def __init__(
27+
self, resource: Resource, fallback_sampler: _FallbackSampler, client_id: str, clock: _Clock, lock: Lock
28+
):
29+
self.__client_id = client_id
2530
self.__rule_appliers: [_SamplingRuleApplier] = []
2631
self.__cache_lock = lock
2732
self.__resource = resource
2833
self._fallback_sampler = fallback_sampler
29-
self._date_time = date_time
30-
self._last_modified = self._date_time.datetime.now()
34+
self._clock = clock
35+
self._last_modified = self._clock.now()
3136

3237
def should_sample(
3338
self,
@@ -39,6 +44,7 @@ def should_sample(
3944
links: Sequence[Link] = None,
4045
trace_state: TraceState = None,
4146
) -> SamplingResult:
47+
rule_applier: _SamplingRuleApplier
4248
for rule_applier in self.__rule_appliers:
4349
if rule_applier.matches(self.__resource, attributes):
4450
return rule_applier.should_sample(
@@ -51,6 +57,8 @@ def should_sample(
5157
trace_state=trace_state,
5258
)
5359

60+
_logger.debug("No sampling rules were matched")
61+
# Should not ever reach fallback sampler as default rule is able to match
5462
return self._fallback_sampler.should_sample(
5563
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
5664
)
@@ -65,28 +73,70 @@ def update_sampling_rules(self, new_sampling_rules: [_SamplingRule]) -> None:
6573
if sampling_rule.Version != 1:
6674
_logger.debug("sampling rule without Version 1 is not supported: RuleName: %s", sampling_rule.RuleName)
6775
continue
68-
temp_rule_appliers.append(_SamplingRuleApplier(sampling_rule))
76+
temp_rule_appliers.append(_SamplingRuleApplier(sampling_rule, self.__client_id, self._clock))
6977

7078
self.__cache_lock.acquire()
7179

7280
# map list of rule appliers by each applier's sampling_rule name
73-
rule_applier_map = {rule.sampling_rule.RuleName: rule for rule in self.__rule_appliers}
81+
rule_applier_map: Dict[str, _SamplingRuleApplier] = {
82+
applier.sampling_rule.RuleName: applier for applier in self.__rule_appliers
83+
}
7484

7585
# If a sampling rule has not changed, keep its respective applier in the cache.
86+
new_applier: _SamplingRuleApplier
7687
for index, new_applier in enumerate(temp_rule_appliers):
7788
rule_name_to_check = new_applier.sampling_rule.RuleName
7889
if rule_name_to_check in rule_applier_map:
7990
old_applier = rule_applier_map[rule_name_to_check]
8091
if new_applier.sampling_rule == old_applier.sampling_rule:
8192
temp_rule_appliers[index] = old_applier
8293
self.__rule_appliers = temp_rule_appliers
83-
self._last_modified = datetime.datetime.now()
94+
self._last_modified = self._clock.now()
8495

8596
self.__cache_lock.release()
8697

98+
def update_sampling_targets(self, sampling_targets_response: _SamplingTargetResponse) -> (bool, int):
99+
targets: [_SamplingTarget] = sampling_targets_response.SamplingTargetDocuments
100+
101+
with self.__cache_lock:
102+
next_polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS
103+
min_polling_interval = None
104+
105+
target_map: Dict[str, _SamplingTarget] = {target.RuleName: target for target in targets}
106+
107+
new_appliers = []
108+
applier: _SamplingRuleApplier
109+
for applier in self.__rule_appliers:
110+
if applier.sampling_rule.RuleName in target_map:
111+
target = target_map[applier.sampling_rule.RuleName]
112+
new_appliers.append(applier.with_target(target))
113+
114+
if target.Interval is not None:
115+
if min_polling_interval is None or min_polling_interval > target.Interval:
116+
min_polling_interval = target.Interval
117+
else:
118+
new_appliers.append(applier)
119+
120+
self.__rule_appliers = new_appliers
121+
122+
if min_polling_interval is not None:
123+
next_polling_interval = min_polling_interval
124+
125+
last_rule_modification = self._clock.from_timestamp(sampling_targets_response.LastRuleModification)
126+
refresh_rules = last_rule_modification > self._last_modified
127+
128+
return (refresh_rules, next_polling_interval)
129+
130+
def get_all_statistics(self) -> [dict]:
131+
all_statistics = []
132+
applier: _SamplingRuleApplier
133+
for applier in self.__rule_appliers:
134+
all_statistics.append(applier.get_then_reset_statistics())
135+
return all_statistics
136+
87137
def expired(self) -> bool:
88138
self.__cache_lock.acquire()
89139
try:
90-
return datetime.datetime.now() > self._last_modified + datetime.timedelta(seconds=CACHE_TTL_SECONDS)
140+
return self._clock.now() > self._last_modified + self._clock.time_delta(seconds=CACHE_TTL_SECONDS)
91141
finally:
92142
self.__cache_lock.release()

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_sampling_rule.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(
3636
self.URLPath = URLPath if URLPath is not None else ""
3737
self.Version = Version if Version is not None else 0
3838

39-
def __lt__(self, other) -> bool:
39+
def __lt__(self, other: "_SamplingRule") -> bool:
4040
if self.Priority == other.Priority:
4141
# String order priority example:
4242
# "A","Abc","a","ab","abc","abcdef"

0 commit comments

Comments
 (0)