-
Notifications
You must be signed in to change notification settings - Fork 205
MSAL Python 1.18.0b1 #471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSAL Python 1.18.0b1 #471
Changes from all commits
22fb44a
b3f4410
f814050
b37c083
da0a463
e717407
0a62ead
14965f7
4397477
a3acb32
1b5d2d6
149e5fc
c7e81ba
292e28b
ea18829
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# All rights reserved. | ||
# | ||
# This code is licensed under the MIT License. | ||
|
||
"""This module wraps Cloud Shell's IMDS-like interface inside an OAuth2-like helper""" | ||
import base64 | ||
import json | ||
import logging | ||
import os | ||
import time | ||
try: # Python 2 | ||
from urlparse import urlparse | ||
except: # Python 3 | ||
from urllib.parse import urlparse | ||
from .oauth2cli.oidc import decode_part | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def _is_running_in_cloud_shell(): | ||
return os.environ.get("AZUREPS_HOST_ENVIRONMENT", "").startswith("cloud-shell") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just as an FYI, Azure CLI uses another env var def in_cloud_console():
return os.environ.get('ACC_CLOUD', None)
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dug this out from my chat messages history.
|
||
|
||
|
||
def _scope_to_resource(scope): # This is an experimental reasonable-effort approach | ||
cloud_shell_supported_audiences = [ | ||
"https://analysis.windows.net/powerbi/api", # Came from https://msazure.visualstudio.com/One/_git/compute-CloudShell?path=/src/images/agent/env/envconfig.PROD.json | ||
"https://pas.windows.net/CheckMyAccess/Linux/.default", # Cloud Shell accepts it as-is | ||
] | ||
for a in cloud_shell_supported_audiences: | ||
if scope.startswith(a): | ||
return a | ||
u = urlparse(scope) | ||
if u.scheme: | ||
return "{}://{}".format(u.scheme, u.netloc) | ||
return scope # There is no much else we can do here | ||
|
||
|
||
def _obtain_token(http_client, scopes, client_id=None, data=None): | ||
resp = http_client.post( | ||
"http://localhost:50342/oauth2/token", | ||
data=dict( | ||
data or {}, | ||
resource=" ".join(map(_scope_to_resource, scopes))), | ||
headers={"Metadata": "true"}, | ||
) | ||
if resp.status_code >= 300: | ||
logger.debug("Cloud Shell IMDS error: %s", resp.text) | ||
cs_error = json.loads(resp.text).get("error", {}) | ||
return {k: v for k, v in { | ||
"error": cs_error.get("code"), | ||
"error_description": cs_error.get("message"), | ||
}.items() if v} | ||
imds_payload = json.loads(resp.text) | ||
BEARER = "Bearer" | ||
oauth2_response = { | ||
"access_token": imds_payload["access_token"], | ||
"expires_in": int(imds_payload["expires_in"]), | ||
"token_type": imds_payload.get("token_type", BEARER), | ||
} | ||
expected_token_type = (data or {}).get("token_type", BEARER) | ||
if oauth2_response["token_type"] != expected_token_type: | ||
return { # Generate a normal error (rather than an intrusive exception) | ||
"error": "broker_error", | ||
"error_description": "token_type {} is not supported by this version of Azure Portal".format( | ||
expected_token_type), | ||
} | ||
parts = imds_payload["access_token"].split(".") | ||
|
||
# The following default values are useful in SSH Cert scenario | ||
client_info = { # Default value, in case the real value will be unavailable | ||
"uid": "user", | ||
"utid": "cloudshell", | ||
} | ||
now = time.time() | ||
preferred_username = "currentuser@cloudshell" | ||
oauth2_response["id_token_claims"] = { # First 5 claims are required per OIDC | ||
"iss": "cloudshell", | ||
"sub": "user", | ||
"aud": client_id, | ||
"exp": now + 3600, | ||
"iat": now, | ||
"preferred_username": preferred_username, # Useful as MSAL account's username | ||
} | ||
|
||
if len(parts) == 3: # Probably a JWT. Use it to derive client_info and id token. | ||
try: | ||
# Data defined in https://docs.microsoft.com/en-us/azure/active-directory/develop/access-tokens#payload-claims | ||
jwt_payload = json.loads(decode_part(parts[1])) | ||
client_info = { | ||
# Mimic a real home_account_id, | ||
# so that this pseudo account and a real account would interop. | ||
"uid": jwt_payload.get("oid", "user"), | ||
"utid": jwt_payload.get("tid", "cloudshell"), | ||
} | ||
oauth2_response["id_token_claims"] = { | ||
"iss": jwt_payload["iss"], | ||
"sub": jwt_payload["sub"], # Could use oid instead | ||
"aud": client_id, | ||
"exp": jwt_payload["exp"], | ||
"iat": jwt_payload["iat"], | ||
"preferred_username": jwt_payload.get("preferred_username") # V2 | ||
or jwt_payload.get("unique_name") # V1 | ||
or preferred_username, | ||
} | ||
except ValueError: | ||
logger.debug("Unable to decode jwt payload: %s", parts[1]) | ||
oauth2_response["client_info"] = base64.b64encode( | ||
# Mimic a client_info, so that MSAL would create an account | ||
json.dumps(client_info).encode("utf-8")).decode("utf-8") | ||
oauth2_response["id_token_claims"]["tid"] = client_info["utid"] # TBD | ||
|
||
## Note: Decided to not surface resource back as scope, | ||
## because they would cause the downstream OAuth2 code path to | ||
## cache the token with a different scope and won't hit them later. | ||
#if imds_payload.get("resource"): | ||
# oauth2_response["scope"] = imds_payload["resource"] | ||
if imds_payload.get("refresh_token"): | ||
oauth2_response["refresh_token"] = imds_payload["refresh_token"] | ||
return oauth2_response | ||
|
Uh oh!
There was an error while loading. Please reload this page.