Skip to content

Implement instance_discovery=False #496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 63 additions & 15 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .oauth2cli import Client, JwtAssertionCreator
from .oauth2cli.oidc import decode_part
from .authority import Authority
from .authority import Authority, WORLD_WIDE
from .mex import send_request as mex_send_request
from .wstrust_request import send_request as wst_send_request
from .wstrust_response import *
Expand Down Expand Up @@ -146,7 +146,6 @@ def obtain_token_by_username_password(self, username, password, **kwargs):


class ClientApplication(object):

ACQUIRE_TOKEN_SILENT_ID = "84"
ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
Expand Down Expand Up @@ -174,6 +173,7 @@ def __init__(
# when we would eventually want to add this feature to PCA in future.
exclude_scopes=None,
http_cache=None,
instance_discovery=None,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is instance_discovery=None here means default to False?

I guess we want the default value to be true?

Copy link
Collaborator Author

@rayluo rayluo Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, I think I am moving away from using True or False as the default value, because they would force downstream yet mid-tier libraries (such as Azure Identity) to also duplicate the same default value in their corresponding API surface.

I would instead use None as default value, and it just means default. It does NOT imply a boolean evaluation to make it a False.

And the actual default behavior will and only will be available via documentation. See line 419 in this PR, or read the rendered docs in the staging environment.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the caller (me :)) can specify None as the input value of instance_discovery to say "use default"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :-)

):
"""Create an instance of application.

Expand Down Expand Up @@ -409,11 +409,40 @@ def __init__(
Personally Identifiable Information (PII). Encryption is unnecessary.

New in version 1.16.0.

:param boolean instance_discovery:
Historically, MSAL would connect to a central endpoint located at
``https://login.microsoftonline.com`` to acquire some metadata,
especially when using an unfamiliar authority.
This behavior is known as Instance Discovery.

This parameter defaults to None, which enables the Instance Discovery.

If you know some authorities which you allow MSAL to operate with as-is,
without involving any Instance Discovery, the recommended pattern is::

known_authorities = frozenset([ # Treat your known authorities as const
"https://contoso.com/adfs", "https://login.azs/foo"])
...
authority = "https://contoso.com/adfs" # Assuming your app will use this
app1 = PublicClientApplication(
"client_id",
authority=authority,
# Conditionally disable Instance Discovery for known authorities
instance_discovery=authority not in known_authorities,
)

If you do not know some authorities beforehand,
yet still want MSAL to accept any authority that you will provide,
you can use a ``False`` to unconditionally disable Instance Discovery.

New in version 1.19.0.
"""
self.client_id = client_id
self.client_credential = client_credential
self.client_claims = client_claims
self._client_capabilities = client_capabilities
self._instance_discovery = instance_discovery

if exclude_scopes and not isinstance(exclude_scopes, list):
raise ValueError(
Expand Down Expand Up @@ -453,18 +482,24 @@ def __init__(

# Here the self.authority will not be the same type as authority in input
try:
authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
self.authority = Authority(
authority or "https://login.microsoftonline.com/common/",
self.http_client, validate_authority=validate_authority)
authority_to_use,
self.http_client,
validate_authority=validate_authority,
instance_discovery=self._instance_discovery,
)
except ValueError: # Those are explicit authority validation errors
raise
except Exception: # The rest are typically connection errors
if validate_authority and azure_region:
# Since caller opts in to use region, here we tolerate connection
# errors happened during authority validation at non-region endpoint
self.authority = Authority(
authority or "https://login.microsoftonline.com/common/",
self.http_client, validate_authority=False)
authority_to_use,
self.http_client,
instance_discovery=False,
)
else:
raise

Expand Down Expand Up @@ -534,10 +569,11 @@ def _get_regional_authority(self, central_authority):
"sts.windows.net",
)
else "{}.{}".format(region_to_use, central_authority.instance))
return Authority(
return Authority( # The central_authority has already been validated
"https://{}/{}".format(regional_host, central_authority.tenant),
self.http_client,
validate_authority=False) # The central_authority has already been validated
instance_discovery=False,
)
return None

def _build_client(self, client_credential, authority, skip_regional_client=False):
Expand Down Expand Up @@ -789,7 +825,8 @@ def get_authorization_request_url(
# Multi-tenant app can use new authority on demand
the_authority = Authority(
authority,
self.http_client
self.http_client,
instance_discovery=self._instance_discovery,
) if authority else self.authority

client = _ClientWithCcsRoutingInfo(
Expand Down Expand Up @@ -1012,14 +1049,23 @@ def _find_msal_accounts(self, environment):
}
return list(grouped_accounts.values())

def _get_instance_metadata(self): # This exists so it can be mocked in unit test
resp = self.http_client.get(
"https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint
headers={'Accept': 'application/json'})
resp.raise_for_status()
return json.loads(resp.text)['metadata']

def _get_authority_aliases(self, instance):
if self._instance_discovery is False:
return []
if self.authority._is_known_to_developer:
# Then it is an ADFS/B2C/known_authority_hosts situation
# which may not reach the central endpoint, so we skip it.
return []
if not self.authority_groups:
resp = self.http_client.get(
"https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize",
headers={'Accept': 'application/json'})
resp.raise_for_status()
self.authority_groups = [
set(group['aliases']) for group in json.loads(resp.text)['metadata']]
set(group['aliases']) for group in self._get_instance_metadata()]
for group in self.authority_groups:
if instance in group:
return [alias for alias in group if alias != instance]
Expand Down Expand Up @@ -1168,6 +1214,7 @@ def acquire_token_silent_with_error(
# the_authority = Authority(
# authority,
# self.http_client,
# instance_discovery=self._instance_discovery,
# ) if authority else self.authority
result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
scopes, account, self.authority, force_refresh=force_refresh,
Expand All @@ -1189,7 +1236,8 @@ def acquire_token_silent_with_error(
the_authority = Authority(
"https://" + alias + "/" + self.authority.tenant,
self.http_client,
validate_authority=False)
instance_discovery=False,
)
result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
scopes, account, the_authority, force_refresh=force_refresh,
claims_challenge=claims_challenge,
Expand Down
44 changes: 29 additions & 15 deletions msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ def http_client(self): # Obsolete. We will remove this eventually
"authority.http_client might be removed in MSAL Python 1.21+", DeprecationWarning)
return self._http_client

def __init__(self, authority_url, http_client, validate_authority=True):
def __init__(
self, authority_url, http_client,
validate_authority=True,
instance_discovery=None,
):
"""Creates an authority instance, and also validates it.

:param validate_authority:
Expand All @@ -67,19 +71,34 @@ def __init__(self, authority_url, http_client, validate_authority=True):
This parameter only controls whether an instance discovery will be
performed.
"""
# :param instance_discovery:
# By default, the known-to-Microsoft validation will use an
# instance discovery endpoint located at ``login.microsoftonline.com``.
# You can customize the endpoint by providing a url as a string.
# Or you can turn this behavior off by passing in a False here.
self._http_client = http_client
if isinstance(authority_url, AuthorityBuilder):
authority_url = str(authority_url)
authority, self.instance, tenant = canonicalize(authority_url)
self.is_adfs = tenant.lower() == 'adfs'
parts = authority.path.split('/')
is_b2c = any(self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS) or (
len(parts) == 3 and parts[2].lower().startswith("b2c_"))
if (tenant != "adfs" and (not is_b2c) and validate_authority
and self.instance not in WELL_KNOWN_AUTHORITY_HOSTS):
payload = instance_discovery(
is_b2c = any(
self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
self._is_known_to_developer = self.is_adfs or is_b2c or not validate_authority
is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only
WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
# See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
# and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
) if instance_discovery in (None, True) else instance_discovery
if instance_discovery_endpoint and not (
is_known_to_microsoft or self._is_known_to_developer):
payload = _instance_discovery(
"https://{}{}/oauth2/v2.0/authorize".format(
self.instance, authority.path),
self._http_client)
self._http_client,
instance_discovery_endpoint)
if payload.get("error") == "invalid_instance":
raise ValueError(
"invalid_instance: "
Expand Down Expand Up @@ -113,7 +132,6 @@ def __init__(self, authority_url, http_client, validate_authority=True):
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
self.is_adfs = self.tenant.lower() == 'adfs'

def user_realm_discovery(self, username, correlation_id=None, response=None):
# It will typically return a dict containing "ver", "account_type",
Expand Down Expand Up @@ -145,13 +163,9 @@ def canonicalize(authority_url):
% authority_url)
return authority, authority.hostname, parts[1]

def instance_discovery(url, http_client, **kwargs):
resp = http_client.get( # Note: This URL seemingly returns V1 endpoint only
'https://{}/common/discovery/instance'.format(
WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
# See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
# and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
),
def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
resp = http_client.get(
instance_discovery_endpoint,
params={'authorization_endpoint': url, 'api-version': '1.0'},
**kwargs)
return json.loads(resp.text)
Expand Down
66 changes: 66 additions & 0 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import os
try:
from unittest.mock import patch
except:
from mock import patch

import msal
from msal.authority import *
from tests import unittest
from tests.http_client import MinimalHttpClient
Expand Down Expand Up @@ -123,3 +128,64 @@ class MockResponse(object):
finally: # MUST NOT let the previous test changes affect other test cases
Authority._domains_without_user_realm_discovery = set([])


@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
})
@patch("msal.authority._instance_discovery")
@patch.object(msal.ClientApplication, "_get_instance_metadata", return_value=[])
class TestMsalBehaviorsWithoutAndWithInstanceDiscoveryBoolean(unittest.TestCase):
"""Test cases use ClientApplication, which is a base class of both PCA and CCA"""

def test_by_default_a_known_to_microsoft_authority_should_skip_validation_but_still_use_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
app = msal.ClientApplication("id", authority="https://login.microsoftonline.com/common")
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_called_once_with()

def test_validate_authority_boolean_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
"""Pending deprecation, but kept for backward compatibility, for now"""
app = msal.ClientApplication(
"id", authority="https://contoso.com/common", validate_authority=False)
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_by_default_adfs_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
"""Not strictly required, but when/if we already supported it, we better keep it"""
app = msal.ClientApplication("id", authority="https://contoso.com/adfs")
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_by_default_b2c_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
"""Not strictly required, but when/if we already supported it, we better keep it"""
app = msal.ClientApplication(
"id", authority="https://login.b2clogin.com/contoso/b2c_policy")
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_turning_off_instance_discovery_should_work_for_all_kinds_of_clouds(
self, instance_metadata, known_to_microsoft_validation, _):
for authority in [
"https://login.microsoftonline.com/common", # Known to Microsoft
"https://contoso.com/adfs", # ADFS
"https://login.b2clogin.com/contoso/b2c_policy", # B2C
"https://private.cloud/foo", # Private Cloud
]:
self._test_turning_off_instance_discovery_should_skip_authority_validation_and_instance_metadata(
authority, instance_metadata, known_to_microsoft_validation)

def _test_turning_off_instance_discovery_should_skip_authority_validation_and_instance_metadata(
self, authority, instance_metadata, known_to_microsoft_validation):
app = msal.ClientApplication("id", authority=authority, instance_discovery=False)
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()