2
2
import json
3
3
import time
4
4
try : # Python 2
5
- from urlparse import urljoin
5
+ from urlparse import urljoin , urlparse
6
6
except : # Python 3
7
- from urllib .parse import urljoin
7
+ from urllib .parse import urljoin , urlparse
8
8
import logging
9
9
import sys
10
10
import warnings
13
13
14
14
from .oauth2cli import Client , JwtAssertionCreator
15
15
from .oauth2cli .oidc import decode_part
16
- from .authority import Authority
16
+ from .authority import Authority , WORLD_WIDE
17
17
from .mex import send_request as mex_send_request
18
18
from .wstrust_request import send_request as wst_send_request
19
19
from .wstrust_response import *
@@ -146,7 +146,6 @@ def obtain_token_by_username_password(self, username, password, **kwargs):
146
146
147
147
148
148
class ClientApplication (object ):
149
-
150
149
ACQUIRE_TOKEN_SILENT_ID = "84"
151
150
ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
152
151
ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
@@ -160,6 +159,48 @@ class ClientApplication(object):
160
159
161
160
ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"
162
161
162
+ _known_authority_hosts = None
163
+
164
+ @classmethod
165
+ def set_known_authority_hosts (cls , known_authority_hosts ):
166
+ """Declare a list of hosts which you allow MSAL to operate with.
167
+
168
+ If your app operates with some authorities that you know and own,
169
+ such as some ADFS or B2C or private cloud,
170
+ it is recommended and sometimes required that you declare them here,
171
+ so that MSAL will use your authorities without discovery,
172
+ and reject most of the other undefined authorities.
173
+
174
+ ``known_authority_hosts`` is meant to be a static and per-deployment setting.
175
+ This classmethod shall be called at most once,
176
+ during your entire app's starting-up,
177
+ before your initializing any ``PublicClientApplication`` or
178
+ ``ConfidentialClientApplication`` instance(s).
179
+
180
+ :param list[str] known_authority_hosts:
181
+ Authorities that you known, for example::
182
+
183
+ [
184
+ "contoso.com", # Your own domain
185
+ "login.azs", # This can be a private cloud
186
+ ]
187
+
188
+ New in version 1.19
189
+ """
190
+ new_input = frozenset (known_authority_hosts )
191
+ if (cls ._known_authority_hosts is not None
192
+ and cls ._known_authority_hosts != new_input ):
193
+ raise ValueError (
194
+ "The known_authority_hosts are considered static. "
195
+ "Once configured, they should not be changed." )
196
+ cls ._known_authority_hosts = new_input
197
+ logger .debug ('known_authority_hosts is set to %s' , known_authority_hosts )
198
+
199
+ def _union_known_authority_hosts (cls , url = None , host = None ):
200
+ host = host if host else urlparse (url ).netloc .split (":" )[0 ]
201
+ return (cls ._known_authority_hosts .union ([host ])
202
+ if cls ._known_authority_hosts else frozenset ([host ]))
203
+
163
204
def __init__ (
164
205
self , client_id ,
165
206
client_credential = None , authority = None , validate_authority = True ,
@@ -453,18 +494,24 @@ def __init__(
453
494
454
495
# Here the self.authority will not be the same type as authority in input
455
496
try :
497
+ authority_to_use = authority or "https://{}/common/" .format (WORLD_WIDE )
456
498
self .authority = Authority (
457
- authority or "https://login.microsoftonline.com/common/" ,
458
- self .http_client , validate_authority = validate_authority )
499
+ authority_to_use ,
500
+ self .http_client , validate_authority = validate_authority ,
501
+ known_authority_hosts = self .__class__ ._known_authority_hosts ,
502
+ )
459
503
except ValueError : # Those are explicit authority validation errors
460
504
raise
461
505
except Exception : # The rest are typically connection errors
462
506
if validate_authority and azure_region :
463
507
# Since caller opts in to use region, here we tolerate connection
464
508
# errors happened during authority validation at non-region endpoint
465
509
self .authority = Authority (
466
- authority or "https://login.microsoftonline.com/common/" ,
467
- self .http_client , validate_authority = False )
510
+ authority_to_use ,
511
+ self .http_client ,
512
+ known_authority_hosts = self ._union_known_authority_hosts (
513
+ url = authority_to_use ),
514
+ )
468
515
else :
469
516
raise
470
517
@@ -534,10 +581,12 @@ def _get_regional_authority(self, central_authority):
534
581
"sts.windows.net" ,
535
582
)
536
583
else "{}.{}" .format (region_to_use , central_authority .instance ))
537
- return Authority (
584
+ return Authority ( # The central_authority has already been validated
538
585
"https://{}/{}" .format (regional_host , central_authority .tenant ),
539
586
self .http_client ,
540
- validate_authority = False ) # The central_authority has already been validated
587
+ known_authority_hosts = self ._union_known_authority_hosts (
588
+ host = regional_host ),
589
+ )
541
590
return None
542
591
543
592
def _build_client (self , client_credential , authority , skip_regional_client = False ):
@@ -789,7 +838,8 @@ def get_authorization_request_url(
789
838
# Multi-tenant app can use new authority on demand
790
839
the_authority = Authority (
791
840
authority ,
792
- self .http_client
841
+ self .http_client ,
842
+ known_authority_hosts = self .__class__ ._known_authority_hosts ,
793
843
) if authority else self .authority
794
844
795
845
client = _ClientWithCcsRoutingInfo (
@@ -1012,14 +1062,21 @@ def _find_msal_accounts(self, environment):
1012
1062
}
1013
1063
return list (grouped_accounts .values ())
1014
1064
1065
+ def _get_instance_metadata (self ): # This exists so it can be mocked in unit test
1066
+ resp = self .http_client .get (
1067
+ "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize" ,
1068
+ headers = {'Accept' : 'application/json' })
1069
+ resp .raise_for_status ()
1070
+ return json .loads (resp .text )['metadata' ]
1071
+
1015
1072
def _get_authority_aliases (self , instance ):
1073
+ if self .authority ._is_known_to_developer :
1074
+ # Then it is an ADFS/B2C/known_authority_hosts situation
1075
+ # which may not reach the central endpoint, so we skip it.
1076
+ return []
1016
1077
if not self .authority_groups :
1017
- resp = self .http_client .get (
1018
- "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize" ,
1019
- headers = {'Accept' : 'application/json' })
1020
- resp .raise_for_status ()
1021
1078
self .authority_groups = [
1022
- set (group ['aliases' ]) for group in json . loads ( resp . text )[ 'metadata' ] ]
1079
+ set (group ['aliases' ]) for group in self . _get_instance_metadata () ]
1023
1080
for group in self .authority_groups :
1024
1081
if instance in group :
1025
1082
return [alias for alias in group if alias != instance ]
@@ -1189,7 +1246,8 @@ def acquire_token_silent_with_error(
1189
1246
the_authority = Authority (
1190
1247
"https://" + alias + "/" + self .authority .tenant ,
1191
1248
self .http_client ,
1192
- validate_authority = False )
1249
+ known_authority_hosts = self ._union_known_authority_hosts (host = alias ),
1250
+ )
1193
1251
result = self ._acquire_token_silent_from_cache_and_possibly_refresh_it (
1194
1252
scopes , account , the_authority , force_refresh = force_refresh ,
1195
1253
claims_challenge = claims_challenge ,
0 commit comments