8
8
import logging
9
9
import sys
10
10
import warnings
11
- import uuid
11
+ from threading import Lock
12
12
13
13
import requests
14
14
18
18
from .wstrust_request import send_request as wst_send_request
19
19
from .wstrust_response import *
20
20
from .token_cache import TokenCache
21
+ import msal .telemetry
21
22
22
23
23
24
# The __init__.py will import this. Not the other way around.
@@ -52,18 +53,6 @@ def decorate_scope(
52
53
decorated = scope_set | reserved_scope
53
54
return list (decorated )
54
55
55
- CLIENT_REQUEST_ID = 'client-request-id'
56
- CLIENT_CURRENT_TELEMETRY = 'x-client-current-telemetry'
57
-
58
- def _get_new_correlation_id ():
59
- correlation_id = str (uuid .uuid4 ())
60
- logger .debug ("Generates correlation_id: %s" , correlation_id )
61
- return correlation_id
62
-
63
-
64
- def _build_current_telemetry_request_header (public_api_id , force_refresh = False ):
65
- return "1|{},{}|" .format (public_api_id , "1" if force_refresh else "0" )
66
-
67
56
68
57
def extract_certs (public_cert_content ):
69
58
# Parses raw public certificate file contents and returns a list of strings
@@ -257,6 +246,14 @@ def __init__(
257
246
self .token_cache = token_cache or TokenCache ()
258
247
self .client = self ._build_client (client_credential , self .authority )
259
248
self .authority_groups = None
249
+ self ._telemetry_buffer = {}
250
+ self ._telemetry_lock = Lock ()
251
+
252
+ def _build_telemetry_context (
253
+ self , api_id , correlation_id = None , refresh_reason = None ):
254
+ return msal .telemetry ._TelemetryContext (
255
+ self ._telemetry_buffer , self ._telemetry_lock , api_id ,
256
+ correlation_id = correlation_id , refresh_reason = refresh_reason )
260
257
261
258
def _build_client (self , client_credential , authority ):
262
259
client_assertion = None
@@ -520,21 +517,21 @@ def authorize(): # A controller in a web app
520
517
return redirect(url_for("index"))
521
518
"""
522
519
self ._validate_ssh_cert_input_data (kwargs .get ("data" , {}))
523
- return _clean_up (self .client .obtain_token_by_auth_code_flow (
520
+ telemetry_context = self ._build_telemetry_context (
521
+ self .ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID )
522
+ response = _clean_up (self .client .obtain_token_by_auth_code_flow (
524
523
auth_code_flow ,
525
524
auth_response ,
526
525
scope = decorate_scope (scopes , self .client_id ) if scopes else None ,
527
- headers = {
528
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
529
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
530
- self .ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID ),
531
- },
526
+ headers = telemetry_context .generate_headers (),
532
527
data = dict (
533
528
kwargs .pop ("data" , {}),
534
529
claims = _merge_claims_challenge_and_capabilities (
535
530
self ._client_capabilities ,
536
531
auth_code_flow .pop ("claims_challenge" , None ))),
537
532
** kwargs ))
533
+ telemetry_context .update_telemetry (response )
534
+ return response
538
535
539
536
def acquire_token_by_authorization_code (
540
537
self ,
@@ -593,20 +590,20 @@ def acquire_token_by_authorization_code(
593
590
"Change your acquire_token_by_authorization_code() "
594
591
"to acquire_token_by_auth_code_flow()" , DeprecationWarning )
595
592
with warnings .catch_warnings (record = True ):
596
- return _clean_up (self .client .obtain_token_by_authorization_code (
593
+ telemetry_context = self ._build_telemetry_context (
594
+ self .ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID )
595
+ response = _clean_up (self .client .obtain_token_by_authorization_code (
597
596
code , redirect_uri = redirect_uri ,
598
597
scope = decorate_scope (scopes , self .client_id ),
599
- headers = {
600
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
601
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
602
- self .ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID ),
603
- },
598
+ headers = telemetry_context .generate_headers (),
604
599
data = dict (
605
600
kwargs .pop ("data" , {}),
606
601
claims = _merge_claims_challenge_and_capabilities (
607
602
self ._client_capabilities , claims_challenge )),
608
603
nonce = nonce ,
609
604
** kwargs ))
605
+ telemetry_context .update_telemetry (resposne )
606
+ return response
610
607
611
608
def get_accounts (self , username = None ):
612
609
"""Get a list of accounts which previously signed in, i.e. exists in cache.
@@ -735,7 +732,7 @@ def acquire_token_silent(
735
732
- None when cache lookup does not yield a token.
736
733
"""
737
734
result = self .acquire_token_silent_with_error (
738
- scopes , account , authority , force_refresh ,
735
+ scopes , account , authority = authority , force_refresh = force_refresh ,
739
736
claims_challenge = claims_challenge , ** kwargs )
740
737
return result if result and "error" not in result else None
741
738
@@ -780,7 +777,7 @@ def acquire_token_silent_with_error(
780
777
"""
781
778
assert isinstance (scopes , list ), "Invalid parameter type"
782
779
self ._validate_ssh_cert_input_data (kwargs .get ("data" , {}))
783
- correlation_id = _get_new_correlation_id ()
780
+ correlation_id = msal . telemetry . _get_new_correlation_id ()
784
781
if authority :
785
782
warnings .warn ("We haven't decided how/if this method will accept authority parameter" )
786
783
# the_authority = Authority(
@@ -851,9 +848,11 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
851
848
target = scopes ,
852
849
query = query )
853
850
now = time .time ()
851
+ refresh_reason = msal .telemetry .AT_ABSENT
854
852
for entry in matches :
855
853
expires_in = int (entry ["expires_on" ]) - now
856
854
if expires_in < 5 * 60 : # Then consider it expired
855
+ refresh_reason = msal .telemetry .AT_EXPIRED
857
856
continue # Removal is not necessary, it will be overwritten
858
857
logger .debug ("Cache hit an AT" )
859
858
access_token_from_cache = { # Mimic a real response
@@ -862,13 +861,18 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
862
861
"expires_in" : int (expires_in ), # OAuth2 specs defines it as int
863
862
}
864
863
if "refresh_on" in entry and int (entry ["refresh_on" ]) < now : # aging
864
+ refresh_reason = msal .telemetry .AT_AGING
865
865
break # With a fallback in hand, we break here to go refresh
866
+ self ._build_telemetry_context (- 1 ).hit_an_access_token ()
866
867
return access_token_from_cache # It is still good as new
868
+ else :
869
+ refresh_reason = msal .telemetry .FORCE_REFRESH # TODO: It could also mean claims_challenge
870
+ assert refresh_reason , "It should have been established at this point"
867
871
try :
868
- result = self ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family (
872
+ result = _clean_up ( self ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family (
869
873
authority , decorate_scope (scopes , self .client_id ), account ,
870
- force_refresh = force_refresh , claims_challenge = claims_challenge , ** kwargs )
871
- result = _clean_up ( result )
874
+ refresh_reason = refresh_reason , claims_challenge = claims_challenge ,
875
+ ** kwargs ) )
872
876
if (result and "error" not in result ) or (not access_token_from_cache ):
873
877
return result
874
878
except : # The exact HTTP exception is transportation-layer dependent
@@ -922,7 +926,8 @@ def _get_app_metadata(self, environment):
922
926
def _acquire_token_silent_by_finding_specific_refresh_token (
923
927
self , authority , scopes , query ,
924
928
rt_remover = None , break_condition = lambda response : False ,
925
- force_refresh = False , correlation_id = None , claims_challenge = None , ** kwargs ):
929
+ refresh_reason = None , correlation_id = None , claims_challenge = None ,
930
+ ** kwargs ):
926
931
matches = self .token_cache .find (
927
932
self .token_cache .CredentialType .REFRESH_TOKEN ,
928
933
# target=scopes, # AAD RTs are scope-independent
@@ -931,6 +936,9 @@ def _acquire_token_silent_by_finding_specific_refresh_token(
931
936
client = self ._build_client (self .client_credential , authority )
932
937
933
938
response = None # A distinguishable value to mean cache is empty
939
+ telemetry_context = self ._build_telemetry_context (
940
+ self .ACQUIRE_TOKEN_SILENT_ID ,
941
+ correlation_id = correlation_id , refresh_reason = refresh_reason )
934
942
for entry in sorted ( # Since unfit RTs would not be aggressively removed,
935
943
# we start from newer RTs which are more likely fit.
936
944
matches ,
@@ -948,16 +956,13 @@ def _acquire_token_silent_by_finding_specific_refresh_token(
948
956
skip_account_creation = True , # To honor a concurrent remove_account()
949
957
)),
950
958
scope = scopes ,
951
- headers = {
952
- CLIENT_REQUEST_ID : correlation_id or _get_new_correlation_id (),
953
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
954
- self .ACQUIRE_TOKEN_SILENT_ID , force_refresh = force_refresh ),
955
- },
959
+ headers = telemetry_context .generate_headers (),
956
960
data = dict (
957
961
kwargs .pop ("data" , {}),
958
962
claims = _merge_claims_challenge_and_capabilities (
959
963
self ._client_capabilities , claims_challenge )),
960
964
** kwargs )
965
+ telemetry_context .update_telemetry (response )
961
966
if "error" not in response :
962
967
return response
963
968
logger .debug ("Refresh failed. {error}: {error_description}" .format (
@@ -1006,18 +1011,19 @@ def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
1006
1011
* A dict contains no "error" key means migration was successful.
1007
1012
"""
1008
1013
self ._validate_ssh_cert_input_data (kwargs .get ("data" , {}))
1009
- return _clean_up (self .client .obtain_token_by_refresh_token (
1014
+ telemetry_context = self ._build_telemetry_context (
1015
+ self .ACQUIRE_TOKEN_BY_REFRESH_TOKEN ,
1016
+ refresh_reason = msal .telemetry .FORCE_REFRESH )
1017
+ response = _clean_up (self .client .obtain_token_by_refresh_token (
1010
1018
refresh_token ,
1011
1019
scope = decorate_scope (scopes , self .client_id ),
1012
- headers = {
1013
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
1014
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
1015
- self .ACQUIRE_TOKEN_BY_REFRESH_TOKEN ),
1016
- },
1020
+ headers = telemetry_context .generate_headers (),
1017
1021
rt_getter = lambda rt : rt ,
1018
1022
on_updating_rt = False ,
1019
1023
on_removing_rt = lambda rt_item : None , # No OP
1020
1024
** kwargs ))
1025
+ telemetry_context .update_telemetry (response )
1026
+ return response
1021
1027
1022
1028
1023
1029
class PublicClientApplication (ClientApplication ): # browser app or mobile app
@@ -1093,7 +1099,9 @@ def acquire_token_interactive(
1093
1099
self ._validate_ssh_cert_input_data (kwargs .get ("data" , {}))
1094
1100
claims = _merge_claims_challenge_and_capabilities (
1095
1101
self ._client_capabilities , claims_challenge )
1096
- return _clean_up (self .client .obtain_token_by_browser (
1102
+ telemetry_context = self ._build_telemetry_context (
1103
+ self .ACQUIRE_TOKEN_INTERACTIVE )
1104
+ response = _clean_up (self .client .obtain_token_by_browser (
1097
1105
scope = decorate_scope (scopes , self .client_id ) if scopes else None ,
1098
1106
extra_scope_to_consent = extra_scopes_to_consent ,
1099
1107
redirect_uri = "http://localhost:{port}" .format (
@@ -1107,12 +1115,10 @@ def acquire_token_interactive(
1107
1115
"domain_hint" : domain_hint ,
1108
1116
},
1109
1117
data = dict (kwargs .pop ("data" , {}), claims = claims ),
1110
- headers = {
1111
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
1112
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
1113
- self .ACQUIRE_TOKEN_INTERACTIVE ),
1114
- },
1118
+ headers = telemetry_context .generate_headers (),
1115
1119
** kwargs ))
1120
+ telemetry_context .update_telemetry (response )
1121
+ return response
1116
1122
1117
1123
def initiate_device_flow (self , scopes = None , ** kwargs ):
1118
1124
"""Initiate a Device Flow instance,
@@ -1125,13 +1131,10 @@ def initiate_device_flow(self, scopes=None, **kwargs):
1125
1131
- A successful response would contain "user_code" key, among others
1126
1132
- an error response would contain some other readable key/value pairs.
1127
1133
"""
1128
- correlation_id = _get_new_correlation_id ()
1134
+ correlation_id = msal . telemetry . _get_new_correlation_id ()
1129
1135
flow = self .client .initiate_device_flow (
1130
1136
scope = decorate_scope (scopes or [], self .client_id ),
1131
- headers = {
1132
- CLIENT_REQUEST_ID : correlation_id ,
1133
- # CLIENT_CURRENT_TELEMETRY is not currently required
1134
- },
1137
+ headers = {msal .telemetry .CLIENT_REQUEST_ID : correlation_id },
1135
1138
** kwargs )
1136
1139
flow [self .DEVICE_FLOW_CORRELATION_ID ] = correlation_id
1137
1140
return flow
@@ -1155,7 +1158,10 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
1155
1158
- A successful response would contain "access_token" key,
1156
1159
- an error response would contain "error" and usually "error_description".
1157
1160
"""
1158
- return _clean_up (self .client .obtain_token_by_device_flow (
1161
+ telemetry_context = self ._build_telemetry_context (
1162
+ self .ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID ,
1163
+ correlation_id = flow .get (self .DEVICE_FLOW_CORRELATION_ID ))
1164
+ response = _clean_up (self .client .obtain_token_by_device_flow (
1159
1165
flow ,
1160
1166
data = dict (
1161
1167
kwargs .pop ("data" , {}),
@@ -1165,13 +1171,10 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
1165
1171
claims = _merge_claims_challenge_and_capabilities (
1166
1172
self ._client_capabilities , claims_challenge ),
1167
1173
),
1168
- headers = {
1169
- CLIENT_REQUEST_ID :
1170
- flow .get (self .DEVICE_FLOW_CORRELATION_ID ) or _get_new_correlation_id (),
1171
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
1172
- self .ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID ),
1173
- },
1174
+ headers = telemetry_context .generate_headers (),
1174
1175
** kwargs ))
1176
+ telemetry_context .update_telemetry (response )
1177
+ return response
1175
1178
1176
1179
def acquire_token_by_username_password (
1177
1180
self , username , password , scopes , claims_challenge = None , ** kwargs ):
@@ -1196,28 +1199,30 @@ def acquire_token_by_username_password(
1196
1199
- an error response would contain "error" and usually "error_description".
1197
1200
"""
1198
1201
scopes = decorate_scope (scopes , self .client_id )
1199
- headers = {
1200
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
1201
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
1202
- self .ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID ),
1203
- }
1202
+ telemetry_context = self ._build_telemetry_context (
1203
+ self .ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID )
1204
+ headers = telemetry_context .generate_headers ()
1204
1205
data = dict (
1205
1206
kwargs .pop ("data" , {}),
1206
1207
claims = _merge_claims_challenge_and_capabilities (
1207
1208
self ._client_capabilities , claims_challenge ))
1208
1209
if not self .authority .is_adfs :
1209
1210
user_realm_result = self .authority .user_realm_discovery (
1210
- username , correlation_id = headers [CLIENT_REQUEST_ID ])
1211
+ username , correlation_id = headers [msal . telemetry . CLIENT_REQUEST_ID ])
1211
1212
if user_realm_result .get ("account_type" ) == "Federated" :
1212
- return _clean_up (self ._acquire_token_by_username_password_federated (
1213
+ response = _clean_up (self ._acquire_token_by_username_password_federated (
1213
1214
user_realm_result , username , password , scopes = scopes ,
1214
1215
data = data ,
1215
1216
headers = headers , ** kwargs ))
1216
- return _clean_up (self .client .obtain_token_by_username_password (
1217
+ telemetry_context .update_telemetry (response )
1218
+ return response
1219
+ response = _clean_up (self .client .obtain_token_by_username_password (
1217
1220
username , password , scope = scopes ,
1218
1221
headers = headers ,
1219
1222
data = data ,
1220
1223
** kwargs ))
1224
+ telemetry_context .update_telemetry (response )
1225
+ return response
1221
1226
1222
1227
def _acquire_token_by_username_password_federated (
1223
1228
self , user_realm_result , username , password , scopes = None , ** kwargs ):
@@ -1277,18 +1282,18 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
1277
1282
"""
1278
1283
# TBD: force_refresh behavior
1279
1284
self ._validate_ssh_cert_input_data (kwargs .get ("data" , {}))
1280
- return _clean_up (self .client .obtain_token_for_client (
1285
+ telemetry_context = self ._build_telemetry_context (
1286
+ self .ACQUIRE_TOKEN_FOR_CLIENT_ID )
1287
+ response = _clean_up (self .client .obtain_token_for_client (
1281
1288
scope = scopes , # This grant flow requires no scope decoration
1282
- headers = {
1283
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
1284
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
1285
- self .ACQUIRE_TOKEN_FOR_CLIENT_ID ),
1286
- },
1289
+ headers = telemetry_context .generate_headers (),
1287
1290
data = dict (
1288
1291
kwargs .pop ("data" , {}),
1289
1292
claims = _merge_claims_challenge_and_capabilities (
1290
1293
self ._client_capabilities , claims_challenge )),
1291
1294
** kwargs ))
1295
+ telemetry_context .update_telemetry (response )
1296
+ return response
1292
1297
1293
1298
def acquire_token_on_behalf_of (self , user_assertion , scopes , claims_challenge = None , ** kwargs ):
1294
1299
"""Acquires token using on-behalf-of (OBO) flow.
@@ -1316,9 +1321,11 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No
1316
1321
- A successful response would contain "access_token" key,
1317
1322
- an error response would contain "error" and usually "error_description".
1318
1323
"""
1324
+ telemetry_context = self ._build_telemetry_context (
1325
+ self .ACQUIRE_TOKEN_ON_BEHALF_OF_ID )
1319
1326
# The implementation is NOT based on Token Exchange
1320
1327
# https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16
1321
- return _clean_up (self .client .obtain_token_by_assertion ( # bases on assertion RFC 7521
1328
+ response = _clean_up (self .client .obtain_token_by_assertion ( # bases on assertion RFC 7521
1322
1329
user_assertion ,
1323
1330
self .client .GRANT_TYPE_JWT , # IDTs and AAD ATs are all JWTs
1324
1331
scope = decorate_scope (scopes , self .client_id ), # Decoration is used for:
@@ -1332,9 +1339,8 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No
1332
1339
requested_token_use = "on_behalf_of" ,
1333
1340
claims = _merge_claims_challenge_and_capabilities (
1334
1341
self ._client_capabilities , claims_challenge )),
1335
- headers = {
1336
- CLIENT_REQUEST_ID : _get_new_correlation_id (),
1337
- CLIENT_CURRENT_TELEMETRY : _build_current_telemetry_request_header (
1338
- self .ACQUIRE_TOKEN_ON_BEHALF_OF_ID ),
1339
- },
1342
+ headers = telemetry_context .generate_headers (),
1340
1343
** kwargs ))
1344
+ telemetry_context .update_telemetry (response )
1345
+ return response
1346
+
0 commit comments