Skip to content

Implementing known_authority_hosts #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 69 additions & 17 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import json
import time
try: # Python 2
from urlparse import urljoin
from urlparse import urljoin, urlparse
except: # Python 3
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse
import logging
import sys
import warnings
Expand All @@ -13,7 +13,7 @@

from .oauth2cli import Client, JwtAssertionCreator
from .oauth2cli.oidc import decode_part
from .authority import Authority
from .authority import Authority, WORLD_WIDE
from .mex import send_request as mex_send_request
from .wstrust_request import send_request as wst_send_request
from .wstrust_response import *
Expand Down Expand Up @@ -146,7 +146,6 @@ def obtain_token_by_username_password(self, username, password, **kwargs):


class ClientApplication(object):

ACQUIRE_TOKEN_SILENT_ID = "84"
ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
Expand All @@ -160,6 +159,11 @@ class ClientApplication(object):

ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"

def _union_known_authority_hosts(self, url=None, host=None):
host = host if host else urlparse(url).netloc.split(":")[0]
return (self._known_authority_hosts.union([host])
if self._known_authority_hosts else frozenset([host]))

def __init__(
self, client_id,
client_credential=None, authority=None, validate_authority=True,
Expand All @@ -174,6 +178,7 @@ def __init__(
# when we would eventually want to add this feature to PCA in future.
exclude_scopes=None,
http_cache=None,
known_authority_hosts=None,
):
"""Create an instance of application.

Expand Down Expand Up @@ -409,11 +414,41 @@ def __init__(
Personally Identifiable Information (PII). Encryption is unnecessary.

New in version 1.16.0.

:param list[str] known_authority_hosts:
Historically, MSAL would try to connect to a central endpoint
to acquire some metadata for an unfamiliar authority.
This behavior is known as Instance Discovery.

If you know a list of hosts which you allow MSAL to operate with,
you can declare them here, so that MSAL will use them as-is,
and also reject most of other authorities such as a call like this:
``ClientApplication("id", authority="https://undefined.com", known_authority_hosts=["contoso.com", "fabricam.com"])``.

Typically, the ADFS or B2C or private cloud authorities
is recommended and sometimes required to be declared here.

This is meant to be a static and per-deployment setting.
The recommended pattern is to load your predefined constant list
from a configuration file,
and never create new MSAL ``ClientApplication`` instances with
unknown and untrusted authorities during runtime.

Values would look like::

[
"contoso.com", # Your own domain
"login.azs", # This can be a private cloud
]

New in version 1.19
"""
self.client_id = client_id
self.client_credential = client_credential
self.client_claims = client_claims
self._client_capabilities = client_capabilities
self._known_authority_hosts = frozenset(
known_authority_hosts if known_authority_hosts else [])

if exclude_scopes and not isinstance(exclude_scopes, list):
raise ValueError(
Expand Down Expand Up @@ -453,18 +488,24 @@ def __init__(

# Here the self.authority will not be the same type as authority in input
try:
authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
self.authority = Authority(
authority or "https://login.microsoftonline.com/common/",
self.http_client, validate_authority=validate_authority)
authority_to_use,
self.http_client, validate_authority=validate_authority,
known_authority_hosts=self._known_authority_hosts,
)
except ValueError: # Those are explicit authority validation errors
raise
except Exception: # The rest are typically connection errors
if validate_authority and azure_region:
# Since caller opts in to use region, here we tolerate connection
# errors happened during authority validation at non-region endpoint
self.authority = Authority(
authority or "https://login.microsoftonline.com/common/",
self.http_client, validate_authority=False)
authority_to_use,
self.http_client,
known_authority_hosts=self._union_known_authority_hosts(
url=authority_to_use),
)
else:
raise

Expand Down Expand Up @@ -534,10 +575,12 @@ def _get_regional_authority(self, central_authority):
"sts.windows.net",
)
else "{}.{}".format(region_to_use, central_authority.instance))
return Authority(
return Authority( # The central_authority has already been validated
"https://{}/{}".format(regional_host, central_authority.tenant),
self.http_client,
validate_authority=False) # The central_authority has already been validated
known_authority_hosts=self._union_known_authority_hosts(
host=regional_host),
)
return None

def _build_client(self, client_credential, authority, skip_regional_client=False):
Expand Down Expand Up @@ -789,7 +832,8 @@ def get_authorization_request_url(
# Multi-tenant app can use new authority on demand
the_authority = Authority(
authority,
self.http_client
self.http_client,
known_authority_hosts=self._known_authority_hosts,
) if authority else self.authority

client = _ClientWithCcsRoutingInfo(
Expand Down Expand Up @@ -1012,14 +1056,21 @@ def _find_msal_accounts(self, environment):
}
return list(grouped_accounts.values())

def _get_instance_metadata(self): # This exists so it can be mocked in unit test
resp = self.http_client.get(
"https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize",
headers={'Accept': 'application/json'})
resp.raise_for_status()
return json.loads(resp.text)['metadata']

def _get_authority_aliases(self, instance):
if self.authority._is_known_to_developer:
# Then it is an ADFS/B2C/known_authority_hosts situation
# which may not reach the central endpoint, so we skip it.
return []
if not self.authority_groups:
resp = self.http_client.get(
"https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize",
headers={'Accept': 'application/json'})
resp.raise_for_status()
self.authority_groups = [
set(group['aliases']) for group in json.loads(resp.text)['metadata']]
set(group['aliases']) for group in self._get_instance_metadata()]
for group in self.authority_groups:
if instance in group:
return [alias for alias in group if alias != instance]
Expand Down Expand Up @@ -1189,7 +1240,8 @@ def acquire_token_silent_with_error(
the_authority = Authority(
"https://" + alias + "/" + self.authority.tenant,
self.http_client,
validate_authority=False)
known_authority_hosts=self._union_known_authority_hosts(host=alias),
)
result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
scopes, account, the_authority, force_refresh=force_refresh,
claims_challenge=claims_challenge,
Expand Down
24 changes: 18 additions & 6 deletions msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ def http_client(self): # Obsolete. We will remove this eventually
"authority.http_client might be removed in MSAL Python 1.21+", DeprecationWarning)
return self._http_client

def __init__(self, authority_url, http_client, validate_authority=True):
def __init__(
self, authority_url, http_client,
validate_authority=True,
known_authority_hosts=None, # Known-to-developer authority hosts
):
"""Creates an authority instance, and also validates it.

:param validate_authority:
Expand All @@ -67,15 +71,24 @@ def __init__(self, authority_url, http_client, validate_authority=True):
This parameter only controls whether an instance discovery will be
performed.
"""
if known_authority_hosts and not bool(validate_authority):
raise ValueError(
"Since you are using the new known_authority_hosts anyway, "
"you might as well avoid using validate_authority=False completely")
self._http_client = http_client
if isinstance(authority_url, AuthorityBuilder):
authority_url = str(authority_url)
authority, self.instance, tenant = canonicalize(authority_url)
self.is_adfs = tenant.lower() == 'adfs'
parts = authority.path.split('/')
is_b2c = any(self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS) or (
len(parts) == 3 and parts[2].lower().startswith("b2c_"))
if (tenant != "adfs" and (not is_b2c) and validate_authority
and self.instance not in WELL_KNOWN_AUTHORITY_HOSTS):
is_b2c = any(
self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
self._is_known_to_developer = (
self.instance in known_authority_hosts if known_authority_hosts
else (self.is_adfs or is_b2c or not validate_authority))
is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
if not (is_known_to_microsoft or self._is_known_to_developer):
payload = instance_discovery(
"https://{}{}/oauth2/v2.0/authorize".format(
self.instance, authority.path),
Expand Down Expand Up @@ -113,7 +126,6 @@ def __init__(self, authority_url, http_client, validate_authority=True):
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
self.is_adfs = self.tenant.lower() == 'adfs'

def user_realm_discovery(self, username, correlation_id=None, response=None):
# It will typically return a dict containing "ver", "account_type",
Expand Down
97 changes: 97 additions & 0 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import os
try:
from unittest.mock import patch, ANY
except:
from mock import patch, ANY

import msal
from msal.authority import *
from tests import unittest
from tests.http_client import MinimalHttpClient
Expand Down Expand Up @@ -123,3 +128,95 @@ class MockResponse(object):
finally: # MUST NOT let the previous test changes affect other test cases
Authority._domains_without_user_realm_discovery = set([])


@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
})
@patch("msal.authority.instance_discovery")
@patch.object(msal.ClientApplication, "_get_instance_metadata", return_value=[])
class TestMsalBehaviorsWithoutAndWithKnownAuthorityHosts(unittest.TestCase):
"""Test cases use ClientApplication, which is a base class of both PCA and CCA"""

def test_by_default_a_known_to_microsoft_authority_should_skip_validation_but_still_use_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
app = msal.ClientApplication("id", authority="https://login.microsoftonline.com/common")
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected in this case that get_accounts() does instance discovery? That is to say, ClientApplication() doesn't call known_to_microsoft_validation but get_accounts() does.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer is two-fold.

  • The "validating whether an input authority looks legit" and "getting some metadata for alias grouping" are two different things. This test case, as described in line 141, made a point that one does not necessarily turn off the other. In this case, ClientApplication() skips validation, but get_accounts() would like to know alias groups.
  • The actual timing of when to send out the metadata call, is an implementation detail. Since MSAL Python delays that metadata call to get_accounts(), so this test case is written this way. Your MSAL's implementation does not have to call get_accounts().

Last but not least, this entire PR has been shelved, because later we discovered some new requirements for Azure Stack scenarios (internal link), so we proceeded with #496.

instance_metadata.assert_called_once_with()

def test_validate_authority_boolean_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
"""Pending deprecation, but kept for backward compatibility, for now"""
app = msal.ClientApplication(
"id", authority="https://contoso.com/common", validate_authority=False)
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_by_default_adfs_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
"""Not strictly required, but when/if we already supported it, we better keep it"""
app = msal.ClientApplication("id", authority="https://contoso.com/adfs")
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_by_default_b2c_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
"""Not strictly required, but when/if we already supported it, we better keep it"""
app = msal.ClientApplication(
"id", authority="https://login.b2clogin.com/contoso/b2c_policy")
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_known_authority_hosts_should_not_coexist_with_validate_authority_boolean(self, *mocks):
with self.assertRaises(ValueError):
msal.ClientApplication(
"id",
authority="https://unknown.com/common",
validate_authority=False, known_authority_hosts=["private.cloud"])

def test_known_to_developer_authority_should_skip_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
app = msal.ClientApplication(
"foo",
authority="https://private.cloud/foo",
known_authority_hosts=["private.cloud"])
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_not_called()

def test_known_to_developer_setting_should_still_let_a_known_to_microsoft_authority_skip_validation_and_use_instance_metadata( # i.e. same as the without-known-to-developer behavior
self, instance_metadata, known_to_microsoft_validation, _):
app = msal.ClientApplication(
"id",
authority="https://login.microsoftonline.com/common",
known_authority_hosts=["private.cloud"])
known_to_microsoft_validation.assert_not_called()
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_called_once_with()

def test_known_authorities_should_make_unknown_adfs_perform_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
app = msal.ClientApplication(
"foo",
authority="https://unknown.com/adfs",
known_authority_hosts=["private.cloud"])
known_to_microsoft_validation.assert_called_once_with(ANY, ANY) # Python 3.5+
# We effectively mocked a passed validation, so that MSAL will proceed
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_called_once_with()

def test_known_authorities_should_make_unknown_b2c_perform_validation_and_instance_metadata(
self, instance_metadata, known_to_microsoft_validation, _):
app = msal.ClientApplication(
"foo",
authority="https://b2clogin.com/contoso/b2c_policy",
known_authority_hosts=["private.cloud"])
known_to_microsoft_validation.assert_called_once_with(ANY, ANY) # Python 3.5+
# We effectively mocked a passed validation, so that MSAL will proceed
app.get_accounts() # This could make an instance metadata call for authority aliases
instance_metadata.assert_called_once_with()