17
17
import sys
18
18
from time import perf_counter
19
19
from typing import List
20
+ import functools
21
+ import requests
20
22
23
+ import boto3
24
+ from sagemaker .session import Session
21
25
from sagemaker .utils import resolve_value_from_config
22
26
from sagemaker .config .config_schema import TELEMETRY_OPT_OUT_PATH
23
27
from sagemaker .telemetry .constants import (
47
51
FEATURE_TO_CODE = {
48
52
str (Feature .SDK_DEFAULTS ): 1 ,
49
53
str (Feature .LOCAL_MODE ): 2 ,
54
+ str (Feature .REMOTE_FUNCTION ): 3 ,
50
55
}
51
56
52
57
STATUS_TO_CODE = {
@@ -59,86 +64,103 @@ def _telemetry_emitter(feature: str, func_name: str):
59
64
"""Decorator to emit telemetry logs for SageMaker Python SDK functions"""
60
65
61
66
def decorator (func ):
62
- def wrapper (self , * args , ** kwargs ):
63
- logger .info (TELEMETRY_OPT_OUT_MESSAGING )
64
- response = None
65
- caught_ex = None
66
- studio_app_type = process_studio_metadata_file ()
67
-
68
- # Check if telemetry is opted out
69
- telemetry_opt_out_flag = resolve_value_from_config (
70
- direct_input = None ,
71
- config_path = TELEMETRY_OPT_OUT_PATH ,
72
- default_value = False ,
73
- sagemaker_session = self .sagemaker_session ,
74
- )
75
- logger .debug ("TelemetryOptOut flag is set to: %s" , telemetry_opt_out_flag )
76
-
77
- # Construct the feature list to track feature combinations
78
- feature_list : List [int ] = [FEATURE_TO_CODE [str (feature )]]
79
- if self .sagemaker_session :
80
- if self .sagemaker_session .sagemaker_config and feature != Feature .SDK_DEFAULTS :
67
+ @functools .wraps (func )
68
+ def wrapper (* args , ** kwargs ):
69
+ sagemaker_session = None
70
+ if len (args ) > 0 and hasattr (args [0 ], "sagemaker_session" ):
71
+ # Get the sagemaker_session from the instance method args
72
+ sagemaker_session = args [0 ].sagemaker_session
73
+ elif feature == Feature .REMOTE_FUNCTION :
74
+ # Get the sagemaker_session from the function keyword arguments for remote function
75
+ sagemaker_session = kwargs .get (
76
+ "sagemaker_session" , _get_default_sagemaker_session ()
77
+ )
78
+
79
+ if sagemaker_session :
80
+ logger .debug ("sagemaker_session found, preparing to emit telemetry..." )
81
+ logger .info (TELEMETRY_OPT_OUT_MESSAGING )
82
+ response = None
83
+ caught_ex = None
84
+ studio_app_type = process_studio_metadata_file ()
85
+
86
+ # Check if telemetry is opted out
87
+ telemetry_opt_out_flag = resolve_value_from_config (
88
+ direct_input = None ,
89
+ config_path = TELEMETRY_OPT_OUT_PATH ,
90
+ default_value = False ,
91
+ sagemaker_session = sagemaker_session ,
92
+ )
93
+ logger .debug ("TelemetryOptOut flag is set to: %s" , telemetry_opt_out_flag )
94
+
95
+ # Construct the feature list to track feature combinations
96
+ feature_list : List [int ] = [FEATURE_TO_CODE [str (feature )]]
97
+
98
+ if sagemaker_session .sagemaker_config and feature != Feature .SDK_DEFAULTS :
81
99
feature_list .append (FEATURE_TO_CODE [str (Feature .SDK_DEFAULTS )])
82
100
83
- if self . sagemaker_session .local_mode and feature != Feature .LOCAL_MODE :
101
+ if sagemaker_session .local_mode and feature != Feature .LOCAL_MODE :
84
102
feature_list .append (FEATURE_TO_CODE [str (Feature .LOCAL_MODE )])
85
103
86
- # Construct the extra info to track platform and environment usage metadata
87
- extra = (
88
- f"{ func_name } "
89
- f"&x-sdkVersion={ SDK_VERSION } "
90
- f"&x-env={ PYTHON_VERSION } "
91
- f"&x-sys={ OS_NAME_VERSION } "
92
- f"&x-platform={ studio_app_type } "
93
- )
94
-
95
- # Add endpoint ARN to the extra info if available
96
- if self .sagemaker_session and self .sagemaker_session .endpoint_arn :
97
- extra += f"&x-endpointArn={ self .sagemaker_session .endpoint_arn } "
98
-
99
- start_timer = perf_counter ()
100
- try :
101
- # Call the original function
102
- response = func (self , * args , ** kwargs )
103
- stop_timer = perf_counter ()
104
- elapsed = stop_timer - start_timer
105
- extra += f"&x-latency={ round (elapsed , 2 )} "
106
- if not telemetry_opt_out_flag :
107
- _send_telemetry_request (
108
- STATUS_TO_CODE [str (Status .SUCCESS )],
109
- feature_list ,
110
- self .sagemaker_session ,
111
- None ,
112
- None ,
113
- extra ,
114
- )
115
- except Exception as e : # pylint: disable=W0703
116
- stop_timer = perf_counter ()
117
- elapsed = stop_timer - start_timer
118
- extra += f"&x-latency={ round (elapsed , 2 )} "
119
- if not telemetry_opt_out_flag :
120
- _send_telemetry_request (
121
- STATUS_TO_CODE [str (Status .FAILURE )],
122
- feature_list ,
123
- self .sagemaker_session ,
124
- str (e ),
125
- e .__class__ .__name__ ,
126
- extra ,
127
- )
128
- caught_ex = e
129
- finally :
130
- if caught_ex :
131
- raise caught_ex
132
- return response # pylint: disable=W0150
104
+ # Construct the extra info to track platform and environment usage metadata
105
+ extra = (
106
+ f"{ func_name } "
107
+ f"&x-sdkVersion={ SDK_VERSION } "
108
+ f"&x-env={ PYTHON_VERSION } "
109
+ f"&x-sys={ OS_NAME_VERSION } "
110
+ f"&x-platform={ studio_app_type } "
111
+ )
112
+
113
+ # Add endpoint ARN to the extra info if available
114
+ if sagemaker_session .endpoint_arn :
115
+ extra += f"&x-endpointArn={ sagemaker_session .endpoint_arn } "
116
+
117
+ start_timer = perf_counter ()
118
+ try :
119
+ # Call the original function
120
+ response = func (* args , ** kwargs )
121
+ stop_timer = perf_counter ()
122
+ elapsed = stop_timer - start_timer
123
+ extra += f"&x-latency={ round (elapsed , 2 )} "
124
+ if not telemetry_opt_out_flag :
125
+ _send_telemetry_request (
126
+ STATUS_TO_CODE [str (Status .SUCCESS )],
127
+ feature_list ,
128
+ sagemaker_session ,
129
+ None ,
130
+ None ,
131
+ extra ,
132
+ )
133
+ except Exception as e : # pylint: disable=W0703
134
+ stop_timer = perf_counter ()
135
+ elapsed = stop_timer - start_timer
136
+ extra += f"&x-latency={ round (elapsed , 2 )} "
137
+ if not telemetry_opt_out_flag :
138
+ _send_telemetry_request (
139
+ STATUS_TO_CODE [str (Status .FAILURE )],
140
+ feature_list ,
141
+ sagemaker_session ,
142
+ str (e ),
143
+ e .__class__ .__name__ ,
144
+ extra ,
145
+ )
146
+ caught_ex = e
147
+ finally :
148
+ if caught_ex :
149
+ raise caught_ex
150
+ return response # pylint: disable=W0150
151
+ else :
152
+ logger .debug (
153
+ "Unable to send telemetry for function %s. "
154
+ "sagemaker_session is not provided or not valid." ,
155
+ func_name ,
156
+ )
157
+ return func (* args , ** kwargs )
133
158
134
159
return wrapper
135
160
136
161
return decorator
137
162
138
163
139
- from sagemaker .session import Session # noqa: E402 pylint: disable=C0413
140
-
141
-
142
164
def _send_telemetry_request (
143
165
status : int ,
144
166
feature_list : List [int ],
@@ -165,9 +187,9 @@ def _send_telemetry_request(
165
187
# Send the telemetry request
166
188
logger .debug ("Sending telemetry request to [%s]" , url )
167
189
_requests_helper (url , 2 )
168
- logger .debug ("SageMaker Python SDK telemetry successfully emitted! " )
190
+ logger .debug ("SageMaker Python SDK telemetry successfully emitted. " )
169
191
except Exception : # pylint: disable=W0703
170
- logger .debug ("SageMaker Python SDK telemetry not emitted!! " )
192
+ logger .debug ("SageMaker Python SDK telemetry not emitted!" )
171
193
172
194
173
195
def _construct_url (
@@ -196,9 +218,6 @@ def _construct_url(
196
218
return base_url
197
219
198
220
199
- import requests # noqa: E402 pylint: disable=C0413,C0411
200
-
201
-
202
221
def _requests_helper (url , timeout ):
203
222
"""Make a GET request to the given URL"""
204
223
@@ -227,3 +246,11 @@ def _get_region_or_default(session):
227
246
return session .boto_session .region_name
228
247
except Exception : # pylint: disable=W0703
229
248
return DEFAULT_AWS_REGION
249
+
250
+
251
+ def _get_default_sagemaker_session ():
252
+ """Return the default sagemaker session"""
253
+ boto_session = boto3 .Session (region_name = DEFAULT_AWS_REGION )
254
+ sagemaker_session = Session (boto_session = boto_session )
255
+
256
+ return sagemaker_session
0 commit comments