Skip to content

Add ROPC support to Confidential Client #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 85 additions & 85 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,91 @@ def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
telemetry_context.update_telemetry(response)
return response

def acquire_token_by_username_password(
self, username, password, scopes, claims_challenge=None, **kwargs):
"""Gets a token for a given resource via user credentials.

See this page for constraints of Username Password Flow.
https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication

:param str username: Typically a UPN in the form of an email address.
:param str password: The password.
:param list[str] scopes:
Scopes requested to access a protected API (a resource).
:param claims_challenge:
The claims_challenge parameter requests specific claims requested by the resource provider
in the form of a claims_challenge directive in the www-authenticate header to be
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
It is a string of a JSON object which contains lists of claims being requested from these locations.

:return: A dict representing the json response from AAD:

- A successful response would contain "access_token" key,
- an error response would contain "error" and usually "error_description".
"""
scopes = decorate_scope(scopes, self.client_id)
telemetry_context = self._build_telemetry_context(
self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
headers = telemetry_context.generate_headers()
data = dict(
kwargs.pop("data", {}),
claims=_merge_claims_challenge_and_capabilities(
self._client_capabilities, claims_challenge))
if not self.authority.is_adfs:
user_realm_result = self.authority.user_realm_discovery(
username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
if user_realm_result.get("account_type") == "Federated":
response = _clean_up(self._acquire_token_by_username_password_federated(
user_realm_result, username, password, scopes=scopes,
data=data,
headers=headers, **kwargs))
telemetry_context.update_telemetry(response)
return response
response = _clean_up(self.client.obtain_token_by_username_password(
username, password, scope=scopes,
headers=headers,
data=data,
**kwargs))
telemetry_context.update_telemetry(response)
return response

def _acquire_token_by_username_password_federated(
self, user_realm_result, username, password, scopes=None, **kwargs):
wstrust_endpoint = {}
if user_realm_result.get("federation_metadata_url"):
wstrust_endpoint = mex_send_request(
user_realm_result["federation_metadata_url"],
self.http_client)
if wstrust_endpoint is None:
raise ValueError("Unable to find wstrust endpoint from MEX. "
"This typically happens when attempting MSA accounts. "
"More details available here. "
"https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
wstrust_result = wst_send_request(
username, password,
user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
wstrust_endpoint.get("address",
# Fallback to an AAD supplied endpoint
user_realm_result.get("federation_active_auth_url")),
wstrust_endpoint.get("action"), self.http_client)
if not ("token" in wstrust_result and "type" in wstrust_result):
raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
grant_type = {
SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
}.get(wstrust_result.get("type"))
if not grant_type:
raise RuntimeError(
"RSTR returned unknown token type: %s", wstrust_result.get("type"))
self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
grant_type, self.client.encode_saml_assertion)
return self.client.obtain_token_by_assertion(
wstrust_result["token"], grant_type, scope=scopes, **kwargs)


class PublicClientApplication(ClientApplication): # browser app or mobile app

Expand Down Expand Up @@ -1176,91 +1261,6 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
telemetry_context.update_telemetry(response)
return response

def acquire_token_by_username_password(
self, username, password, scopes, claims_challenge=None, **kwargs):
"""Gets a token for a given resource via user credentials.

See this page for constraints of Username Password Flow.
https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication

:param str username: Typically a UPN in the form of an email address.
:param str password: The password.
:param list[str] scopes:
Scopes requested to access a protected API (a resource).
:param claims_challenge:
The claims_challenge parameter requests specific claims requested by the resource provider
in the form of a claims_challenge directive in the www-authenticate header to be
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
It is a string of a JSON object which contains lists of claims being requested from these locations.

:return: A dict representing the json response from AAD:

- A successful response would contain "access_token" key,
- an error response would contain "error" and usually "error_description".
"""
scopes = decorate_scope(scopes, self.client_id)
telemetry_context = self._build_telemetry_context(
self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
headers = telemetry_context.generate_headers()
data = dict(
kwargs.pop("data", {}),
claims=_merge_claims_challenge_and_capabilities(
self._client_capabilities, claims_challenge))
if not self.authority.is_adfs:
user_realm_result = self.authority.user_realm_discovery(
username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
if user_realm_result.get("account_type") == "Federated":
response = _clean_up(self._acquire_token_by_username_password_federated(
user_realm_result, username, password, scopes=scopes,
data=data,
headers=headers, **kwargs))
telemetry_context.update_telemetry(response)
return response
response = _clean_up(self.client.obtain_token_by_username_password(
username, password, scope=scopes,
headers=headers,
data=data,
**kwargs))
telemetry_context.update_telemetry(response)
return response

def _acquire_token_by_username_password_federated(
self, user_realm_result, username, password, scopes=None, **kwargs):
wstrust_endpoint = {}
if user_realm_result.get("federation_metadata_url"):
wstrust_endpoint = mex_send_request(
user_realm_result["federation_metadata_url"],
self.http_client)
if wstrust_endpoint is None:
raise ValueError("Unable to find wstrust endpoint from MEX. "
"This typically happens when attempting MSA accounts. "
"More details available here. "
"https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
wstrust_result = wst_send_request(
username, password,
user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
wstrust_endpoint.get("address",
# Fallback to an AAD supplied endpoint
user_realm_result.get("federation_active_auth_url")),
wstrust_endpoint.get("action"), self.http_client)
if not ("token" in wstrust_result and "type" in wstrust_result):
raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
grant_type = {
SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
}.get(wstrust_result.get("type"))
if not grant_type:
raise RuntimeError(
"RSTR returned unknown token type: %s", wstrust_result.get("type"))
self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
grant_type, self.client.encode_saml_assertion)
return self.client.obtain_token_by_assertion(
wstrust_result["token"], grant_type, scope=scopes, **kwargs)


class ConfidentialClientApplication(ClientApplication): # server-side web app

Expand Down
3 changes: 2 additions & 1 deletion sample/username_password_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
config = json.load(open(sys.argv[1]))

# Create a preferably long-lived app instance which maintains a token cache.
app = msal.PublicClientApplication(
app = msal.ClientApplication(
config["client_id"], authority=config["authority"],
client_credential=config.get("client_secret"),
# token_cache=... # Default cache is in memory only.
# You can learn how to use SerializableTokenCache from
# https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
Expand Down
34 changes: 32 additions & 2 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,12 @@ def assertCacheWorksForApp(self, result_from_wire, scope):

def _test_username_password(self,
authority=None, client_id=None, username=None, password=None, scope=None,
client_secret=None, # Since MSAL 1.11, confidential client has ROPC too
**ignored):
assert authority and client_id and username and password and scope
self.app = msal.PublicClientApplication(
client_id, authority=authority, http_client=MinimalHttpClient())
self.app = msal.ClientApplication(
client_id, authority=authority, http_client=MinimalHttpClient(),
client_credential=client_secret)
result = self.app.acquire_token_by_username_password(
username, password, scopes=scope)
self.assertLoosely(result)
Expand Down Expand Up @@ -650,6 +652,34 @@ def test_acquire_token_obo(self):

self._test_acquire_token_obo(config_pca, config_cca)

def test_acquire_token_by_client_secret(self):
# This is copied from ArlingtonCloudTestCase's same test case
try:
config = self.get_lab_user(usertype="cloud", publicClient="no")
except requests.exceptions.HTTPError:
self.skipTest("The lab does not provide confidential app for testing")
else:
config["client_secret"] = self.get_lab_user_secret("TBD") # TODO
self._test_acquire_token_by_client_secret(**config)

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
"Need LAB_OBO_CLIENT_SECRET from https://aka.ms/GetLabSecret?Secret=TodoListServiceV2-OBO")
@unittest.skipUnless(
os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID"),
"Need LAB_OBO_CONFIDENTIAL_CLIENT_ID from https://docs.msidlab.com/flows/onbehalfofflow.html")
def test_confidential_client_acquire_token_by_username_password(self):
# This approach won't work:
# config = self.get_lab_user(usertype="cloud", publicClient="no")
# so we repurpose the obo confidential app to test ROPC
config = self.get_lab_user(usertype="cloud")
config["password"] = self.get_lab_user_secret(config["lab_name"])
# Swap in the OBO confidential app
config["client_id"] = os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID")
config["scope"] = ["https://graph.microsoft.com/.default"]
config["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET")
self._test_username_password(**config)

def _build_b2c_authority(self, policy):
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
return base + "/" + policy # We do not support base + "?p=" + policy
Expand Down