29
29
__version__ = "1.16.0"
30
30
31
31
logger = logging .getLogger (__name__ )
32
+ _CLOUD_SHELL_USER = "current_cloud_shell_user"
32
33
33
34
34
35
def extract_certs (public_cert_content ):
@@ -954,6 +955,19 @@ def get_accounts(self, username=None):
954
955
# Even in the rare case when an RT is revoked and then removed,
955
956
# acquire_token_silent() would then yield no result,
956
957
# apps would fall back to other acquire methods. This is the standard pattern.
958
+ if _is_running_in_cloud_shell ():
959
+ # In Cloud Shell, user already signed in with an account.
960
+ # We pretend we have that account, for acquire_token_silent() to work.
961
+ # Note: If user acquire_token_by_xyz() using that account in MSAL later,
962
+ # the get_accounts() would return multiple accounts to calling app.
963
+ accounts .insert (0 , {
964
+ "home_account_id" : _CLOUD_SHELL_USER ,
965
+ "environment" : "" ,
966
+ "realm" : "" ,
967
+ "local_account_id" : _CLOUD_SHELL_USER ,
968
+ "username" : "Current Cloud Shell User" ,
969
+ "authority_type" : TokenCache .AuthorityType .MSSTS ,
970
+ })
957
971
return accounts
958
972
959
973
def _find_msal_accounts (self , environment ):
@@ -1112,6 +1126,12 @@ def acquire_token_silent_with_error(
1112
1126
"""
1113
1127
assert isinstance (scopes , list ), "Invalid parameter type"
1114
1128
self ._validate_ssh_cert_input_data (kwargs .get ("data" , {}))
1129
+ if account and account .get ("home_account_id" ) == _CLOUD_SHELL_USER :
1130
+ # Since we don't currently store cloud shell tokens in MSAL's cache,
1131
+ # we can have a shortcut here, and semantically bypass all those
1132
+ # _acquire_token_silent_from_cache_and_possibly_refresh_it()
1133
+ return self ._acquire_token_by_cloud_shell (
1134
+ scopes , data = kwargs .get ("data" , {}))
1115
1135
correlation_id = msal .telemetry ._get_new_correlation_id ()
1116
1136
if authority :
1117
1137
warnings .warn ("We haven't decided how/if this method will accept authority parameter" )
@@ -1204,6 +1224,11 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1204
1224
refresh_reason = msal .telemetry .FORCE_REFRESH # TODO: It could also mean claims_challenge
1205
1225
assert refresh_reason , "It should have been established at this point"
1206
1226
try :
1227
+ ## When/if we will store Cloud Shell tokens into MSAL's token cache,
1228
+ # then we will add the following code snippet here.
1229
+ #if account and account.get("home_account_id") == _CLOUD_SHELL_USER:
1230
+ # result = self._acquire_token_by_cloud_shell(scopes, **kwargs)
1231
+ #else:
1207
1232
result = _clean_up (self ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family (
1208
1233
authority , self ._decorate_scope (scopes ), account ,
1209
1234
refresh_reason = refresh_reason , claims_challenge = claims_challenge ,
@@ -1216,6 +1241,24 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1216
1241
raise # We choose to bubble up the exception
1217
1242
return access_token_from_cache
1218
1243
1244
+ def _acquire_token_by_cloud_shell (self , scopes , ** kwargs ):
1245
+ kwargs .pop ("correlation_id" , None ) # IMDS does not use correlation_id
1246
+ resp = self .http_client .post (
1247
+ "http://localhost:50342/oauth2/token" ,
1248
+ data = dict (kwargs .pop ("data" , {}), resource = " " .join (scopes )),
1249
+ headers = dict (kwargs .pop ("headers" , {}), Metadata = "true" ),
1250
+ ** kwargs )
1251
+ if resp .status_code >= 300 :
1252
+ logger .debug ("Cloud Shell IMDS error: %s" , resp .text )
1253
+ cs_error = json .loads (resp .text ).get ("error" , {})
1254
+ return {k : v for k , v in {
1255
+ "error" : cs_error .get ("code" ),
1256
+ "error_description" : cs_error .get ("message" ),
1257
+ }.items () if v }
1258
+ else :
1259
+ # Skip token cache, for now. Cloud Shell IMDS has its own cache anyway.
1260
+ return json .loads (resp .text )
1261
+
1219
1262
def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family (
1220
1263
self , authority , scopes , account , ** kwargs ):
1221
1264
query = {
0 commit comments