Skip to content

Commit 31b24af

Browse files
committed
Implementing Telemetry V4
Implement Telemetry's app-wide state Test cases for telemetry id on most public methods Test telemetry buffer for offline states
1 parent 9d29158 commit 31b24af

File tree

4 files changed

+346
-113
lines changed

4 files changed

+346
-113
lines changed

msal/application.py

Lines changed: 86 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
import sys
1010
import warnings
11-
import uuid
11+
from threading import Lock
1212

1313
import requests
1414

@@ -18,6 +18,7 @@
1818
from .wstrust_request import send_request as wst_send_request
1919
from .wstrust_response import *
2020
from .token_cache import TokenCache
21+
import msal.telemetry
2122

2223

2324
# The __init__.py will import this. Not the other way around.
@@ -52,18 +53,6 @@ def decorate_scope(
5253
decorated = scope_set | reserved_scope
5354
return list(decorated)
5455

55-
CLIENT_REQUEST_ID = 'client-request-id'
56-
CLIENT_CURRENT_TELEMETRY = 'x-client-current-telemetry'
57-
58-
def _get_new_correlation_id():
59-
correlation_id = str(uuid.uuid4())
60-
logger.debug("Generates correlation_id: %s", correlation_id)
61-
return correlation_id
62-
63-
64-
def _build_current_telemetry_request_header(public_api_id, force_refresh=False):
65-
return "1|{},{}|".format(public_api_id, "1" if force_refresh else "0")
66-
6756

6857
def extract_certs(public_cert_content):
6958
# Parses raw public certificate file contents and returns a list of strings
@@ -257,6 +246,14 @@ def __init__(
257246
self.token_cache = token_cache or TokenCache()
258247
self.client = self._build_client(client_credential, self.authority)
259248
self.authority_groups = None
249+
self._telemetry_buffer = {}
250+
self._telemetry_lock = Lock()
251+
252+
def _build_telemetry_context(
253+
self, api_id, correlation_id=None, refresh_reason=None):
254+
return msal.telemetry._TelemetryContext(
255+
self._telemetry_buffer, self._telemetry_lock, api_id,
256+
correlation_id=correlation_id, refresh_reason=refresh_reason)
260257

261258
def _build_client(self, client_credential, authority):
262259
client_assertion = None
@@ -520,21 +517,21 @@ def authorize(): # A controller in a web app
520517
return redirect(url_for("index"))
521518
"""
522519
self._validate_ssh_cert_input_data(kwargs.get("data", {}))
523-
return _clean_up(self.client.obtain_token_by_auth_code_flow(
520+
telemetry_context = self._build_telemetry_context(
521+
self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
522+
response =_clean_up(self.client.obtain_token_by_auth_code_flow(
524523
auth_code_flow,
525524
auth_response,
526525
scope=decorate_scope(scopes, self.client_id) if scopes else None,
527-
headers={
528-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
529-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
530-
self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID),
531-
},
526+
headers=telemetry_context.generate_headers(),
532527
data=dict(
533528
kwargs.pop("data", {}),
534529
claims=_merge_claims_challenge_and_capabilities(
535530
self._client_capabilities,
536531
auth_code_flow.pop("claims_challenge", None))),
537532
**kwargs))
533+
telemetry_context.update_telemetry(response)
534+
return response
538535

539536
def acquire_token_by_authorization_code(
540537
self,
@@ -593,20 +590,20 @@ def acquire_token_by_authorization_code(
593590
"Change your acquire_token_by_authorization_code() "
594591
"to acquire_token_by_auth_code_flow()", DeprecationWarning)
595592
with warnings.catch_warnings(record=True):
596-
return _clean_up(self.client.obtain_token_by_authorization_code(
593+
telemetry_context = self._build_telemetry_context(
594+
self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
595+
response = _clean_up(self.client.obtain_token_by_authorization_code(
597596
code, redirect_uri=redirect_uri,
598597
scope=decorate_scope(scopes, self.client_id),
599-
headers={
600-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
601-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
602-
self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID),
603-
},
598+
headers=telemetry_context.generate_headers(),
604599
data=dict(
605600
kwargs.pop("data", {}),
606601
claims=_merge_claims_challenge_and_capabilities(
607602
self._client_capabilities, claims_challenge)),
608603
nonce=nonce,
609604
**kwargs))
605+
telemetry_context.update_telemetry(resposne)
606+
return response
610607

611608
def get_accounts(self, username=None):
612609
"""Get a list of accounts which previously signed in, i.e. exists in cache.
@@ -735,7 +732,7 @@ def acquire_token_silent(
735732
- None when cache lookup does not yield a token.
736733
"""
737734
result = self.acquire_token_silent_with_error(
738-
scopes, account, authority, force_refresh,
735+
scopes, account, authority=authority, force_refresh=force_refresh,
739736
claims_challenge=claims_challenge, **kwargs)
740737
return result if result and "error" not in result else None
741738

@@ -780,7 +777,7 @@ def acquire_token_silent_with_error(
780777
"""
781778
assert isinstance(scopes, list), "Invalid parameter type"
782779
self._validate_ssh_cert_input_data(kwargs.get("data", {}))
783-
correlation_id = _get_new_correlation_id()
780+
correlation_id = msal.telemetry._get_new_correlation_id()
784781
if authority:
785782
warnings.warn("We haven't decided how/if this method will accept authority parameter")
786783
# the_authority = Authority(
@@ -851,9 +848,11 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
851848
target=scopes,
852849
query=query)
853850
now = time.time()
851+
refresh_reason = msal.telemetry.AT_ABSENT
854852
for entry in matches:
855853
expires_in = int(entry["expires_on"]) - now
856854
if expires_in < 5*60: # Then consider it expired
855+
refresh_reason = msal.telemetry.AT_EXPIRED
857856
continue # Removal is not necessary, it will be overwritten
858857
logger.debug("Cache hit an AT")
859858
access_token_from_cache = { # Mimic a real response
@@ -862,13 +861,18 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
862861
"expires_in": int(expires_in), # OAuth2 specs defines it as int
863862
}
864863
if "refresh_on" in entry and int(entry["refresh_on"]) < now: # aging
864+
refresh_reason = msal.telemetry.AT_AGING
865865
break # With a fallback in hand, we break here to go refresh
866+
self._build_telemetry_context(-1).hit_an_access_token()
866867
return access_token_from_cache # It is still good as new
868+
else:
869+
refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge
870+
assert refresh_reason, "It should have been established at this point"
867871
try:
868-
result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
872+
result = _clean_up(self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
869873
authority, decorate_scope(scopes, self.client_id), account,
870-
force_refresh=force_refresh, claims_challenge=claims_challenge, **kwargs)
871-
result = _clean_up(result)
874+
refresh_reason=refresh_reason, claims_challenge=claims_challenge,
875+
**kwargs))
872876
if (result and "error" not in result) or (not access_token_from_cache):
873877
return result
874878
except: # The exact HTTP exception is transportation-layer dependent
@@ -922,7 +926,8 @@ def _get_app_metadata(self, environment):
922926
def _acquire_token_silent_by_finding_specific_refresh_token(
923927
self, authority, scopes, query,
924928
rt_remover=None, break_condition=lambda response: False,
925-
force_refresh=False, correlation_id=None, claims_challenge=None, **kwargs):
929+
refresh_reason=None, correlation_id=None, claims_challenge=None,
930+
**kwargs):
926931
matches = self.token_cache.find(
927932
self.token_cache.CredentialType.REFRESH_TOKEN,
928933
# target=scopes, # AAD RTs are scope-independent
@@ -931,6 +936,9 @@ def _acquire_token_silent_by_finding_specific_refresh_token(
931936
client = self._build_client(self.client_credential, authority)
932937

933938
response = None # A distinguishable value to mean cache is empty
939+
telemetry_context = self._build_telemetry_context(
940+
self.ACQUIRE_TOKEN_SILENT_ID,
941+
correlation_id=correlation_id, refresh_reason=refresh_reason)
934942
for entry in sorted( # Since unfit RTs would not be aggressively removed,
935943
# we start from newer RTs which are more likely fit.
936944
matches,
@@ -948,16 +956,13 @@ def _acquire_token_silent_by_finding_specific_refresh_token(
948956
skip_account_creation=True, # To honor a concurrent remove_account()
949957
)),
950958
scope=scopes,
951-
headers={
952-
CLIENT_REQUEST_ID: correlation_id or _get_new_correlation_id(),
953-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
954-
self.ACQUIRE_TOKEN_SILENT_ID, force_refresh=force_refresh),
955-
},
959+
headers=telemetry_context.generate_headers(),
956960
data=dict(
957961
kwargs.pop("data", {}),
958962
claims=_merge_claims_challenge_and_capabilities(
959963
self._client_capabilities, claims_challenge)),
960964
**kwargs)
965+
telemetry_context.update_telemetry(response)
961966
if "error" not in response:
962967
return response
963968
logger.debug("Refresh failed. {error}: {error_description}".format(
@@ -1006,18 +1011,19 @@ def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
10061011
* A dict contains no "error" key means migration was successful.
10071012
"""
10081013
self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1009-
return _clean_up(self.client.obtain_token_by_refresh_token(
1014+
telemetry_context = self._build_telemetry_context(
1015+
self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN,
1016+
refresh_reason=msal.telemetry.FORCE_REFRESH)
1017+
response = _clean_up(self.client.obtain_token_by_refresh_token(
10101018
refresh_token,
10111019
scope=decorate_scope(scopes, self.client_id),
1012-
headers={
1013-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
1014-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
1015-
self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN),
1016-
},
1020+
headers=telemetry_context.generate_headers(),
10171021
rt_getter=lambda rt: rt,
10181022
on_updating_rt=False,
10191023
on_removing_rt=lambda rt_item: None, # No OP
10201024
**kwargs))
1025+
telemetry_context.update_telemetry(response)
1026+
return response
10211027

10221028

10231029
class PublicClientApplication(ClientApplication): # browser app or mobile app
@@ -1093,7 +1099,9 @@ def acquire_token_interactive(
10931099
self._validate_ssh_cert_input_data(kwargs.get("data", {}))
10941100
claims = _merge_claims_challenge_and_capabilities(
10951101
self._client_capabilities, claims_challenge)
1096-
return _clean_up(self.client.obtain_token_by_browser(
1102+
telemetry_context = self._build_telemetry_context(
1103+
self.ACQUIRE_TOKEN_INTERACTIVE)
1104+
response = _clean_up(self.client.obtain_token_by_browser(
10971105
scope=decorate_scope(scopes, self.client_id) if scopes else None,
10981106
extra_scope_to_consent=extra_scopes_to_consent,
10991107
redirect_uri="http://localhost:{port}".format(
@@ -1107,12 +1115,10 @@ def acquire_token_interactive(
11071115
"domain_hint": domain_hint,
11081116
},
11091117
data=dict(kwargs.pop("data", {}), claims=claims),
1110-
headers={
1111-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
1112-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
1113-
self.ACQUIRE_TOKEN_INTERACTIVE),
1114-
},
1118+
headers=telemetry_context.generate_headers(),
11151119
**kwargs))
1120+
telemetry_context.update_telemetry(response)
1121+
return response
11161122

11171123
def initiate_device_flow(self, scopes=None, **kwargs):
11181124
"""Initiate a Device Flow instance,
@@ -1125,13 +1131,10 @@ def initiate_device_flow(self, scopes=None, **kwargs):
11251131
- A successful response would contain "user_code" key, among others
11261132
- an error response would contain some other readable key/value pairs.
11271133
"""
1128-
correlation_id = _get_new_correlation_id()
1134+
correlation_id = msal.telemetry._get_new_correlation_id()
11291135
flow = self.client.initiate_device_flow(
11301136
scope=decorate_scope(scopes or [], self.client_id),
1131-
headers={
1132-
CLIENT_REQUEST_ID: correlation_id,
1133-
# CLIENT_CURRENT_TELEMETRY is not currently required
1134-
},
1137+
headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id},
11351138
**kwargs)
11361139
flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id
11371140
return flow
@@ -1155,7 +1158,10 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
11551158
- A successful response would contain "access_token" key,
11561159
- an error response would contain "error" and usually "error_description".
11571160
"""
1158-
return _clean_up(self.client.obtain_token_by_device_flow(
1161+
telemetry_context = self._build_telemetry_context(
1162+
self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID,
1163+
correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID))
1164+
response = _clean_up(self.client.obtain_token_by_device_flow(
11591165
flow,
11601166
data=dict(
11611167
kwargs.pop("data", {}),
@@ -1165,13 +1171,10 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
11651171
claims=_merge_claims_challenge_and_capabilities(
11661172
self._client_capabilities, claims_challenge),
11671173
),
1168-
headers={
1169-
CLIENT_REQUEST_ID:
1170-
flow.get(self.DEVICE_FLOW_CORRELATION_ID) or _get_new_correlation_id(),
1171-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
1172-
self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID),
1173-
},
1174+
headers=telemetry_context.generate_headers(),
11741175
**kwargs))
1176+
telemetry_context.update_telemetry(response)
1177+
return response
11751178

11761179
def acquire_token_by_username_password(
11771180
self, username, password, scopes, claims_challenge=None, **kwargs):
@@ -1196,28 +1199,30 @@ def acquire_token_by_username_password(
11961199
- an error response would contain "error" and usually "error_description".
11971200
"""
11981201
scopes = decorate_scope(scopes, self.client_id)
1199-
headers = {
1200-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
1201-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
1202-
self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID),
1203-
}
1202+
telemetry_context = self._build_telemetry_context(
1203+
self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1204+
headers = telemetry_context.generate_headers()
12041205
data = dict(
12051206
kwargs.pop("data", {}),
12061207
claims=_merge_claims_challenge_and_capabilities(
12071208
self._client_capabilities, claims_challenge))
12081209
if not self.authority.is_adfs:
12091210
user_realm_result = self.authority.user_realm_discovery(
1210-
username, correlation_id=headers[CLIENT_REQUEST_ID])
1211+
username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
12111212
if user_realm_result.get("account_type") == "Federated":
1212-
return _clean_up(self._acquire_token_by_username_password_federated(
1213+
response = _clean_up(self._acquire_token_by_username_password_federated(
12131214
user_realm_result, username, password, scopes=scopes,
12141215
data=data,
12151216
headers=headers, **kwargs))
1216-
return _clean_up(self.client.obtain_token_by_username_password(
1217+
telemetry_context.update_telemetry(response)
1218+
return response
1219+
response = _clean_up(self.client.obtain_token_by_username_password(
12171220
username, password, scope=scopes,
12181221
headers=headers,
12191222
data=data,
12201223
**kwargs))
1224+
telemetry_context.update_telemetry(response)
1225+
return response
12211226

12221227
def _acquire_token_by_username_password_federated(
12231228
self, user_realm_result, username, password, scopes=None, **kwargs):
@@ -1277,18 +1282,18 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
12771282
"""
12781283
# TBD: force_refresh behavior
12791284
self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1280-
return _clean_up(self.client.obtain_token_for_client(
1285+
telemetry_context = self._build_telemetry_context(
1286+
self.ACQUIRE_TOKEN_FOR_CLIENT_ID)
1287+
response = _clean_up(self.client.obtain_token_for_client(
12811288
scope=scopes, # This grant flow requires no scope decoration
1282-
headers={
1283-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
1284-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
1285-
self.ACQUIRE_TOKEN_FOR_CLIENT_ID),
1286-
},
1289+
headers=telemetry_context.generate_headers(),
12871290
data=dict(
12881291
kwargs.pop("data", {}),
12891292
claims=_merge_claims_challenge_and_capabilities(
12901293
self._client_capabilities, claims_challenge)),
12911294
**kwargs))
1295+
telemetry_context.update_telemetry(response)
1296+
return response
12921297

12931298
def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
12941299
"""Acquires token using on-behalf-of (OBO) flow.
@@ -1316,9 +1321,11 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No
13161321
- A successful response would contain "access_token" key,
13171322
- an error response would contain "error" and usually "error_description".
13181323
"""
1324+
telemetry_context = self._build_telemetry_context(
1325+
self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID)
13191326
# The implementation is NOT based on Token Exchange
13201327
# https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16
1321-
return _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
1328+
response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
13221329
user_assertion,
13231330
self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs
13241331
scope=decorate_scope(scopes, self.client_id), # Decoration is used for:
@@ -1332,9 +1339,8 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No
13321339
requested_token_use="on_behalf_of",
13331340
claims=_merge_claims_challenge_and_capabilities(
13341341
self._client_capabilities, claims_challenge)),
1335-
headers={
1336-
CLIENT_REQUEST_ID: _get_new_correlation_id(),
1337-
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
1338-
self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID),
1339-
},
1342+
headers=telemetry_context.generate_headers(),
13401343
**kwargs))
1344+
telemetry_context.update_telemetry(response)
1345+
return response
1346+

0 commit comments

Comments
 (0)