Skip to content

Commit 2a68194

Browse files
committed
remote sampling - initial classes and rules poller
1 parent 3c822a8 commit 2a68194

File tree

3 files changed

+156
-0
lines changed

3 files changed

+156
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
# SPDX-License-Identifier: Apache-2.0
5+
import json
6+
from logging import getLogger
7+
from threading import Timer
8+
from typing import Optional, Sequence
9+
10+
from amazon.opentelemetry.distro.sampler.aws_xray_sampling_client import AwsXRaySamplingClient
11+
from opentelemetry.context import Context
12+
from opentelemetry.sdk.resources import Resource
13+
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF, Sampler, SamplingResult
14+
from opentelemetry.trace import Link, SpanKind
15+
from opentelemetry.trace.span import TraceState
16+
from opentelemetry.util.types import Attributes
17+
18+
_logger = getLogger(__name__)
19+
20+
DEFAULT_RULES_POLLING_INTERVAL = 300
21+
DEFAULT_TARGET_POLLING_INTERVAL = 10
22+
DEFAULT_SAMPLING_PROXY_ENDPOINT = "http://127.0.0.1:2000"
23+
24+
class AwsXRayRemoteSampler(Sampler):
25+
"""
26+
Remote Sampler for OpenTelemetry that gets sampling configurations from AWS X-Ray
27+
28+
Args:
29+
resource: OpenTelemetry Resource (Optional)
30+
endpoint: proxy endpoint for AWS X-Ray Sampling (Optional)
31+
polling_interval: Polling interval for getSamplingRules call (Optional)
32+
log_level: custom log level configuration for remote sampler (Optional)
33+
"""
34+
35+
__resource : Resource
36+
__polling_interval : int
37+
__xray_client : AwsXRaySamplingClient
38+
39+
def __init__(self, resource=None, endpoint=DEFAULT_SAMPLING_PROXY_ENDPOINT, polling_interval=DEFAULT_RULES_POLLING_INTERVAL, log_level = None):
40+
# Override default log level
41+
if log_level is not None:
42+
_logger.setLevel(log_level)
43+
44+
self.__xray_client = AwsXRaySamplingClient(endpoint, log_level=log_level)
45+
self.__polling_interval = polling_interval
46+
self.__resource = resource
47+
48+
self.__start_sampling_rule_poller()
49+
50+
def should_sample(
51+
self,
52+
parent_context: Optional["Context"],
53+
trace_id: int,
54+
name: str,
55+
kind: SpanKind = None,
56+
attributes: Attributes = None,
57+
links: Sequence["Link"] = None,
58+
trace_state: "TraceState" = None,
59+
) -> "SamplingResult":
60+
# TODO: add sampling functionality
61+
return ALWAYS_OFF
62+
63+
def get_description(self) -> str:
64+
description = "AwsXRayRemoteSampler{remote sampling with AWS X-Ray}"
65+
return description
66+
67+
def __get_and_update_sampling_rules(self):
68+
sampling_rules = self.__xray_client.get_sampling_rules()
69+
70+
# TODO: Update sampling rules cache
71+
_logger.info(f"Got Sampling Rules: {json.dumps([ob.__dict__ for ob in sampling_rules])}")
72+
73+
def __start_sampling_rule_poller(self):
74+
self.__get_and_update_sampling_rules()
75+
# Schedule the next sampling rule poll
76+
self._timer = Timer(self.__polling_interval, self.__start_sampling_rule_poller)
77+
self._timer.daemon = True
78+
self._timer.start()
79+
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import json
4+
from logging import getLogger
5+
6+
import requests
7+
8+
from amazon.opentelemetry.distro.sampler.sampling_rule import SamplingRule
9+
10+
_logger = getLogger(__name__)
11+
12+
class AwsXRaySamplingClient:
13+
def __init__(self, endpoint=None, log_level=None):
14+
# Override default log level
15+
if log_level is not None:
16+
_logger.setLevel(log_level)
17+
18+
if endpoint is None:
19+
_logger.error("endpoint must be specified")
20+
self.__getSamplingRulesEndpoint = endpoint + "/GetSamplingRules"
21+
22+
def get_sampling_rules(self):
23+
sampling_rules = []
24+
headers = {'content-type': 'application/json'}
25+
26+
try:
27+
r = requests.post(url=self.__getSamplingRulesEndpoint, headers=headers)
28+
if r is None:
29+
raise Exception("GetSamplingRules response is None")
30+
sampling_rules_response = r.json()
31+
if "SamplingRuleRecords" not in sampling_rules_response:
32+
raise Exception(f"SamplingRuleRecords is missing in getSamplingRules response:{sampling_rules_response}")
33+
34+
sampling_rules_records = sampling_rules_response["SamplingRuleRecords"]
35+
for record in sampling_rules_records:
36+
sampling_rules.append(SamplingRule(**record["SamplingRule"]))
37+
38+
except requests.exceptions.RequestException as req_err:
39+
_logger.exception(f"Request error occurred: {req_err}")
40+
except json.JSONDecodeError as json_err:
41+
_logger.exception(f"Error in decoding JSON response: {json_err}")
42+
except Exception as ex:
43+
_logger.exception(f"Exception occurred: {ex}")
44+
45+
return sampling_rules
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
class SamplingRule:
4+
def __init__(
5+
self,
6+
Attributes={},
7+
FixedRate=None,
8+
HTTPMethod=None,
9+
Host=None,
10+
Priority=None,
11+
ReservoirSize=None,
12+
ResourceARN=None,
13+
RuleARN=None,
14+
RuleName=None,
15+
ServiceName=None,
16+
ServiceType=None,
17+
URLPath=None,
18+
Version=None
19+
):
20+
self.Attributes=Attributes
21+
self.FixedRate=FixedRate
22+
self.HTTPMethod=HTTPMethod
23+
self.Host=Host
24+
self.Priority=Priority
25+
self.ReservoirSize=ReservoirSize
26+
self.ResourceARN=ResourceARN
27+
self.RuleARN=RuleARN
28+
self.RuleName=RuleName
29+
self.ServiceName=ServiceName
30+
self.ServiceType=ServiceType
31+
self.URLPath=URLPath
32+
self.Version=Version

0 commit comments

Comments
 (0)