Skip to content

Commit c8abd3a

Browse files
committed
Move ROPC from PCA to base
1 parent 093db3b commit c8abd3a

File tree

3 files changed

+119
-88
lines changed

3 files changed

+119
-88
lines changed

msal/application.py

Lines changed: 85 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,91 @@ def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
10251025
telemetry_context.update_telemetry(response)
10261026
return response
10271027

1028+
def acquire_token_by_username_password(
1029+
self, username, password, scopes, claims_challenge=None, **kwargs):
1030+
"""Gets a token for a given resource via user credentials.
1031+
1032+
See this page for constraints of Username Password Flow.
1033+
https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1034+
1035+
:param str username: Typically a UPN in the form of an email address.
1036+
:param str password: The password.
1037+
:param list[str] scopes:
1038+
Scopes requested to access a protected API (a resource).
1039+
:param claims_challenge:
1040+
The claims_challenge parameter requests specific claims requested by the resource provider
1041+
in the form of a claims_challenge directive in the www-authenticate header to be
1042+
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1043+
It is a string of a JSON object which contains lists of claims being requested from these locations.
1044+
1045+
:return: A dict representing the json response from AAD:
1046+
1047+
- A successful response would contain "access_token" key,
1048+
- an error response would contain "error" and usually "error_description".
1049+
"""
1050+
scopes = decorate_scope(scopes, self.client_id)
1051+
telemetry_context = self._build_telemetry_context(
1052+
self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1053+
headers = telemetry_context.generate_headers()
1054+
data = dict(
1055+
kwargs.pop("data", {}),
1056+
claims=_merge_claims_challenge_and_capabilities(
1057+
self._client_capabilities, claims_challenge))
1058+
if not self.authority.is_adfs:
1059+
user_realm_result = self.authority.user_realm_discovery(
1060+
username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1061+
if user_realm_result.get("account_type") == "Federated":
1062+
response = _clean_up(self._acquire_token_by_username_password_federated(
1063+
user_realm_result, username, password, scopes=scopes,
1064+
data=data,
1065+
headers=headers, **kwargs))
1066+
telemetry_context.update_telemetry(response)
1067+
return response
1068+
response = _clean_up(self.client.obtain_token_by_username_password(
1069+
username, password, scope=scopes,
1070+
headers=headers,
1071+
data=data,
1072+
**kwargs))
1073+
telemetry_context.update_telemetry(response)
1074+
return response
1075+
1076+
def _acquire_token_by_username_password_federated(
1077+
self, user_realm_result, username, password, scopes=None, **kwargs):
1078+
wstrust_endpoint = {}
1079+
if user_realm_result.get("federation_metadata_url"):
1080+
wstrust_endpoint = mex_send_request(
1081+
user_realm_result["federation_metadata_url"],
1082+
self.http_client)
1083+
if wstrust_endpoint is None:
1084+
raise ValueError("Unable to find wstrust endpoint from MEX. "
1085+
"This typically happens when attempting MSA accounts. "
1086+
"More details available here. "
1087+
"https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
1088+
logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
1089+
wstrust_result = wst_send_request(
1090+
username, password,
1091+
user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
1092+
wstrust_endpoint.get("address",
1093+
# Fallback to an AAD supplied endpoint
1094+
user_realm_result.get("federation_active_auth_url")),
1095+
wstrust_endpoint.get("action"), self.http_client)
1096+
if not ("token" in wstrust_result and "type" in wstrust_result):
1097+
raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
1098+
GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1099+
grant_type = {
1100+
SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
1101+
SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
1102+
WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
1103+
WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
1104+
}.get(wstrust_result.get("type"))
1105+
if not grant_type:
1106+
raise RuntimeError(
1107+
"RSTR returned unknown token type: %s", wstrust_result.get("type"))
1108+
self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
1109+
grant_type, self.client.encode_saml_assertion)
1110+
return self.client.obtain_token_by_assertion(
1111+
wstrust_result["token"], grant_type, scope=scopes, **kwargs)
1112+
10281113

10291114
class PublicClientApplication(ClientApplication): # browser app or mobile app
10301115

@@ -1176,91 +1261,6 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
11761261
telemetry_context.update_telemetry(response)
11771262
return response
11781263

1179-
def acquire_token_by_username_password(
1180-
self, username, password, scopes, claims_challenge=None, **kwargs):
1181-
"""Gets a token for a given resource via user credentials.
1182-
1183-
See this page for constraints of Username Password Flow.
1184-
https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1185-
1186-
:param str username: Typically a UPN in the form of an email address.
1187-
:param str password: The password.
1188-
:param list[str] scopes:
1189-
Scopes requested to access a protected API (a resource).
1190-
:param claims_challenge:
1191-
The claims_challenge parameter requests specific claims requested by the resource provider
1192-
in the form of a claims_challenge directive in the www-authenticate header to be
1193-
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1194-
It is a string of a JSON object which contains lists of claims being requested from these locations.
1195-
1196-
:return: A dict representing the json response from AAD:
1197-
1198-
- A successful response would contain "access_token" key,
1199-
- an error response would contain "error" and usually "error_description".
1200-
"""
1201-
scopes = decorate_scope(scopes, self.client_id)
1202-
telemetry_context = self._build_telemetry_context(
1203-
self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1204-
headers = telemetry_context.generate_headers()
1205-
data = dict(
1206-
kwargs.pop("data", {}),
1207-
claims=_merge_claims_challenge_and_capabilities(
1208-
self._client_capabilities, claims_challenge))
1209-
if not self.authority.is_adfs:
1210-
user_realm_result = self.authority.user_realm_discovery(
1211-
username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1212-
if user_realm_result.get("account_type") == "Federated":
1213-
response = _clean_up(self._acquire_token_by_username_password_federated(
1214-
user_realm_result, username, password, scopes=scopes,
1215-
data=data,
1216-
headers=headers, **kwargs))
1217-
telemetry_context.update_telemetry(response)
1218-
return response
1219-
response = _clean_up(self.client.obtain_token_by_username_password(
1220-
username, password, scope=scopes,
1221-
headers=headers,
1222-
data=data,
1223-
**kwargs))
1224-
telemetry_context.update_telemetry(response)
1225-
return response
1226-
1227-
def _acquire_token_by_username_password_federated(
1228-
self, user_realm_result, username, password, scopes=None, **kwargs):
1229-
wstrust_endpoint = {}
1230-
if user_realm_result.get("federation_metadata_url"):
1231-
wstrust_endpoint = mex_send_request(
1232-
user_realm_result["federation_metadata_url"],
1233-
self.http_client)
1234-
if wstrust_endpoint is None:
1235-
raise ValueError("Unable to find wstrust endpoint from MEX. "
1236-
"This typically happens when attempting MSA accounts. "
1237-
"More details available here. "
1238-
"https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
1239-
logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
1240-
wstrust_result = wst_send_request(
1241-
username, password,
1242-
user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
1243-
wstrust_endpoint.get("address",
1244-
# Fallback to an AAD supplied endpoint
1245-
user_realm_result.get("federation_active_auth_url")),
1246-
wstrust_endpoint.get("action"), self.http_client)
1247-
if not ("token" in wstrust_result and "type" in wstrust_result):
1248-
raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
1249-
GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1250-
grant_type = {
1251-
SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
1252-
SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
1253-
WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
1254-
WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
1255-
}.get(wstrust_result.get("type"))
1256-
if not grant_type:
1257-
raise RuntimeError(
1258-
"RSTR returned unknown token type: %s", wstrust_result.get("type"))
1259-
self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
1260-
grant_type, self.client.encode_saml_assertion)
1261-
return self.client.obtain_token_by_assertion(
1262-
wstrust_result["token"], grant_type, scope=scopes, **kwargs)
1263-
12641264

12651265
class ConfidentialClientApplication(ClientApplication): # server-side web app
12661266

sample/username_password_sample.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
config = json.load(open(sys.argv[1]))
3535

3636
# Create a preferably long-lived app instance which maintains a token cache.
37-
app = msal.PublicClientApplication(
37+
app = msal.ClientApplication(
3838
config["client_id"], authority=config["authority"],
39+
client_credential=config.get("client_secret"),
3940
# token_cache=... # Default cache is in memory only.
4041
# You can learn how to use SerializableTokenCache from
4142
# https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache

tests/test_e2e.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,12 @@ def assertCacheWorksForApp(self, result_from_wire, scope):
102102

103103
def _test_username_password(self,
104104
authority=None, client_id=None, username=None, password=None, scope=None,
105+
client_secret=None, # Since MSAL 1.11, confidential client has ROPC too
105106
**ignored):
106107
assert authority and client_id and username and password and scope
107-
self.app = msal.PublicClientApplication(
108-
client_id, authority=authority, http_client=MinimalHttpClient())
108+
self.app = msal.ClientApplication(
109+
client_id, authority=authority, http_client=MinimalHttpClient(),
110+
client_credential=client_secret)
109111
result = self.app.acquire_token_by_username_password(
110112
username, password, scopes=scope)
111113
self.assertLoosely(result)
@@ -650,6 +652,34 @@ def test_acquire_token_obo(self):
650652

651653
self._test_acquire_token_obo(config_pca, config_cca)
652654

655+
def test_acquire_token_by_client_secret(self):
656+
# This is copied from ArlingtonCloudTestCase's same test case
657+
try:
658+
config = self.get_lab_user(usertype="cloud", publicClient="no")
659+
except requests.exceptions.HTTPError:
660+
self.skipTest("The lab does not provide confidential app for testing")
661+
else:
662+
config["client_secret"] = self.get_lab_user_secret("TBD") # TODO
663+
self._test_acquire_token_by_client_secret(**config)
664+
665+
@unittest.skipUnless(
666+
os.getenv("LAB_OBO_CLIENT_SECRET"),
667+
"Need LAB_OBO_CLIENT_SECRET from https://aka.ms/GetLabSecret?Secret=TodoListServiceV2-OBO")
668+
@unittest.skipUnless(
669+
os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID"),
670+
"Need LAB_OBO_CONFIDENTIAL_CLIENT_ID from https://docs.msidlab.com/flows/onbehalfofflow.html")
671+
def test_confidential_client_acquire_token_by_username_password(self):
672+
# This approach won't work:
673+
# config = self.get_lab_user(usertype="cloud", publicClient="no")
674+
# so we repurpose the obo confidential app to test ROPC
675+
config = self.get_lab_user(usertype="cloud")
676+
config["password"] = self.get_lab_user_secret(config["lab_name"])
677+
# Swap in the OBO confidential app
678+
config["client_id"] = os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID")
679+
config["scope"] = ["https://graph.microsoft.com/.default"]
680+
config["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET")
681+
self._test_username_password(**config)
682+
653683
def _build_b2c_authority(self, policy):
654684
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
655685
return base + "/" + policy # We do not support base + "?p=" + policy

0 commit comments

Comments
 (0)