Skip to content

New initialize_auth_code_flow() and acquire_token_by_auth_code_flow() #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,79 @@ def _build_client(self, client_credential, authority):
on_removing_rt=self.token_cache.remove_rt,
on_updating_rt=self.token_cache.update_rt)

def initiate_auth_code_flow(
self,
scopes, # type: list[str]
redirect_uri=None,
state=None, # Recommended by OAuth2 for CSRF protection
prompt=None,
login_hint=None, # type: Optional[str]
domain_hint=None, # type: Optional[str]
claims_challenge=None,
):
"""Initiate an auth code flow.

Later when the response reaches your redirect_uri,
you can use :func:`~acquire_token_by_auth_code_flow()`
to complete the authentication/authorization.

:param list scope:
It is a list of case-sensitive strings.
Some ID provider can accept empty string to represent default scope.
:param str redirect_uri:
Optional. If not specified, server will use the pre-registered one.
:param str state:
An opaque value used by the client to
maintain state between the request and callback.
If absent, this library will automatically generate one internally.
:param str prompt:
By default, no prompt value will be sent, not even "none".
You will have to specify a value explicitly.
Its valid values are defined in Open ID Connect specs
https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
:param str login_hint:
Optional. Identifier of the user. Generally a User Principal Name (UPN).
:param domain_hint:
Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
If included, it will skip the email-based discovery process that user goes
through on the sign-in page, leading to a slightly more streamlined user experience.
More information on possible values
`here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
`here <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.

:return:
The auth code flow. It is a dict in this form::

{
"auth_uri": "https://...", // Guide user to visit this
"state": "...", // You may choose to verify it by yourself,
// or just let acquire_token_by_auth_code_flow()
// do that for you.
"...": "...", // Everything else are reserved and internal
}

The caller is expected to::

1. somehow store this content, typically inside the current session,
2. guide the end user (i.e. resource owner) to visit that auth_uri,
3. and then relay this dict and subsequent auth response to
:func:`~acquire_token_by_auth_code_flow()`.
"""
client = Client(
{"authorization_endpoint": self.authority.authorization_endpoint},
self.client_id,
http_client=self.http_client)
flow = client.initiate_auth_code_flow(
redirect_uri=redirect_uri, state=state, login_hint=login_hint,
prompt=prompt,
scope=decorate_scope(scopes, self.client_id),
domain_hint=domain_hint,
claims=_merge_claims_challenge_and_capabilities(
self._client_capabilities, claims_challenge),
)
flow["claims_challenge"] = claims_challenge
return flow

def get_authorization_request_url(
self,
scopes, # type: list[str]
Expand Down Expand Up @@ -386,6 +459,73 @@ def get_authorization_request_url(
self._client_capabilities, claims_challenge),
)

def acquire_token_by_auth_code_flow(
self, auth_code_flow, auth_response, scopes=None, **kwargs):
"""Validate the auth response being redirected back, and obtain tokens.

It automatically provides nonce protection.

:param dict auth_code_flow:
The same dict returned by :func:`~initiate_auth_code_flow()`.
:param dict auth_response:
A dict of the query string received from auth server.
:param list[str] scopes:
Scopes requested to access a protected API (a resource).

Most of the time, you can leave it empty.

If you requested user consent for multiple resources, here you will
need to provide a subset of what you required in
:func:`~initiate_auth_code_flow()`.

OAuth2 was designed mostly for singleton services,
where tokens are always meant for the same resource and the only
changes are in the scopes.
In AAD, tokens can be issued for multiple 3rd party resources.
You can ask authorization code for multiple resources,
but when you redeem it, the token is for only one intended
recipient, called audience.
So the developer need to specify a scope so that we can restrict the
token to be issued for the corresponding audience.

:return:
* A dict containing "access_token" and/or "id_token", among others,
depends on what scope was used.
(See https://tools.ietf.org/html/rfc6749#section-5.1)
* A dict containing "error", optionally "error_description", "error_uri".
(It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_
or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_)
* Most client-side data error would result in ValueError exception.
So the usage pattern could be without any protocol details::

def authorize(): # A controller in a web app
try:
result = msal_app.acquire_token_by_auth_code_flow(
session.get("flow", {}), request.args)
if "error" in result:
return render_template("error.html", result)
use(result) # Token(s) are available in result and cache
except ValueError: # Usually caused by CSRF
pass # Simply ignore them
return redirect(url_for("index"))
"""
self._validate_ssh_cert_input_data(kwargs.get("data", {}))
return self.client.obtain_token_by_auth_code_flow(
auth_code_flow,
auth_response,
scope=decorate_scope(scopes, self.client_id) if scopes else None,
headers={
CLIENT_REQUEST_ID: _get_new_correlation_id(),
CLIENT_CURRENT_TELEMETRY: _build_current_telemetry_request_header(
self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID),
},
data=dict(
kwargs.pop("data", {}),
claims=_merge_claims_challenge_and_capabilities(
self._client_capabilities,
auth_code_flow.pop("claims_challenge", None))),
**kwargs)

def acquire_token_by_authorization_code(
self,
code,
Expand Down
31 changes: 30 additions & 1 deletion msal/oauth2cli/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import functools
import random
import string
import hashlib

import requests

Expand Down Expand Up @@ -258,6 +259,21 @@ def _stringify(self, sequence):
return sequence # as-is


def _generate_pkce_code_verifier(length=43):
assert 43 <= length <= 128
verifier = "".join( # https://tools.ietf.org/html/rfc7636#section-4.1
random.sample(string.ascii_letters + string.digits + "-._~", length))
code_challenge = (
# https://tools.ietf.org/html/rfc7636#section-4.2
base64.urlsafe_b64encode(hashlib.sha256(verifier.encode("ascii")).digest())
.rstrip(b"=")) # Required by https://tools.ietf.org/html/rfc7636#section-3
return {
"code_verifier": verifier,
"transformation": "S256", # In Python, sha256 is always available
"code_challenge": code_challenge,
}


class Client(BaseClient): # We choose to implement all 4 grants in 1 class
"""This is the main API for oauth2 client.

Expand Down Expand Up @@ -401,6 +417,8 @@ def initiate_auth_code_flow(
you can use :func:`~obtain_token_by_auth_code_flow()`
to complete the authentication/authorization.

This method also provides PKCE protection automatically.

:param list scope:
It is a list of case-sensitive strings.
Some ID provider can accept empty string to represent default scope.
Expand Down Expand Up @@ -440,14 +458,19 @@ def initiate_auth_code_flow(
# Implicit grant would cause auth response coming back in #fragment,
# but fragment won't reach a web service.
raise ValueError('response_type="token ..." is not allowed')
pkce = _generate_pkce_code_verifier()
flow = { # These data are required by obtain_token_by_auth_code_flow()
"state": state or "".join(random.sample(string.ascii_letters, 16)),
"redirect_uri": redirect_uri,
"scope": scope,
}
auth_uri = self._build_auth_request_uri(
response_type, **dict(flow, **kwargs))
response_type,
code_challenge=pkce["code_challenge"],
code_challenge_method=pkce["transformation"],
**dict(flow, **kwargs))
flow["auth_uri"] = auth_uri
flow["code_verifier"] = pkce["code_verifier"]
return flow

def obtain_token_by_auth_code_flow(
Expand All @@ -459,6 +482,8 @@ def obtain_token_by_auth_code_flow(
"""With the auth_response being redirected back,
validate it against auth_code_flow, and then obtain tokens.

Internally, it implements PKCE to mitigate the auth code interception attack.

:param dict auth_code_flow:
The same dict returned by :func:`~initiate_auth_code_flow()`.
:param dict auth_response:
Expand Down Expand Up @@ -513,6 +538,10 @@ def authorize(): # A controller in a web app
# It is both unnecessary and harmless, per RFC 6749.
# We use the same scope already used in auth request uri,
# thus token cache can know what scope the tokens are for.
data=dict( # Extract and update the data
kwargs.pop("data", {}),
code_verifier=auth_code_flow["code_verifier"],
),
**kwargs)
if auth_response.get("error"): # It means the first leg encountered error
# Here we do NOT return original auth_response as-is, to prevent a
Expand Down
7 changes: 4 additions & 3 deletions msal/oauth2cli/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ def initiate_auth_code_flow(
return flow

def obtain_token_by_auth_code_flow(self, auth_code_flow, auth_response, **kwargs):
"""Validate the auth_response being redirected back, and then obtain tokens.
and obtain ID token which can be used for user sign in.
"""Validate the auth_response being redirected back, and then obtain tokens,
including ID token which can be used for user sign in.

It provides nonce protection out-of-the-box.
Internally, it implements nonce to mitigate replay attack.
It also implements PKCE to mitigate the auth code interception attack.

See :func:`oauth2.Client.obtain_token_by_auth_code_flow` in parent class
for descriptions on other parameters and return value.
Expand Down
67 changes: 65 additions & 2 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import msal
from tests.http_client import MinimalHttpClient
from msal.oauth2cli import AuthCodeReceiver

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -297,14 +298,16 @@ def get_lab_app(

Get it from environment variables if defined, otherwise fall back to use MSI.
"""
logger.info(
"Reading ENV variables %s and %s for lab app defined at "
"https://docs.msidlab.com/accounts/confidentialclient.html",
env_client_id, env_client_secret)
if os.getenv(env_client_id) and os.getenv(env_client_secret):
# A shortcut mainly for running tests on developer's local development machine
# or it could be setup on Travis CI
# https://docs.travis-ci.com/user/environment-variables/#defining-variables-in-repository-settings
# Data came from here
# https://docs.msidlab.com/accounts/confidentialclient.html
logger.info("Using lab app defined by ENV variables %s and %s",
env_client_id, env_client_secret)
client_id = os.getenv(env_client_id)
client_secret = os.getenv(env_client_secret)
else:
Expand Down Expand Up @@ -399,6 +402,45 @@ def _test_acquire_token_by_auth_code(
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scope, username=None)

def _test_acquire_token_by_auth_code_flow(
self, client_id=None, authority=None, port=None, scope=None,
username_uri="", # But you would want to provide one
**ignored):
assert client_id and authority and scope
self.app = msal.ClientApplication(
client_id, authority=authority, http_client=MinimalHttpClient())
with AuthCodeReceiver(port=port) as receiver:
flow = self.app.initiate_auth_code_flow(
redirect_uri="http://localhost:%d" % receiver.get_port(),
scopes=scope,
)
auth_response = receiver.get_auth_response(
auth_uri=flow["auth_uri"], state=flow["state"], timeout=60,
welcome_template="""<html><body><h1>{id}</h1><ol>
<li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
<li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
(replace the lab name with the labName from the link above).</li>
<li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
</ol></body></html>""".format(id=self.id(), username_uri=username_uri),
)
self.assertIsNotNone(
auth_response.get("code"), "Error: {}, Detail: {}".format(
auth_response.get("error"), auth_response))
result = self.app.acquire_token_by_auth_code_flow(flow, auth_response)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
json.dumps(self.app.token_cache._cache, indent=4),
json.dumps(result.get("id_token_claims"), indent=4),
)
self.assertIn(
"access_token", result,
"{error}: {error_description}".format(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scope, username=None)

def _test_acquire_token_obo(self, config_pca, config_cca):
# 1. An app obtains a token representing a user, for our mid-tier service
pca = msal.PublicClientApplication(
Expand Down Expand Up @@ -500,6 +542,16 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self):
config["port"] = 8080
self._test_acquire_token_by_auth_code(**config)

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_by_auth_code_flow(self):
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["scope"] = self.adfs2019_scopes
config["port"] = 8080
self._test_acquire_token_by_auth_code_flow(
username_uri="https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019",
**config)

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
Expand Down Expand Up @@ -547,6 +599,17 @@ def test_b2c_acquire_token_by_auth_code(self):
scope=config["defaultScopes"].split(','),
)

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_b2c_acquire_token_by_auth_code_flow(self):
config = self.get_lab_app_object(azureenvironment="azureb2ccloud")
self._test_acquire_token_by_auth_code_flow(
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
client_id=config["appId"],
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
scope=config["defaultScopes"].split(','),
username_uri="https://msidlab.com/api/user?usertype=b2c&b2cprovider=local",
)

def test_b2c_acquire_token_by_ropc(self):
config = self.get_lab_app_object(azureenvironment="azureb2ccloud")
self._test_username_password(
Expand Down