|
1 | 1 | import time
|
2 | 2 |
|
3 |
| -from . import oauth2 |
4 | 3 | from .exceptions import MsalServiceError
|
5 | 4 |
|
6 | 5 |
|
7 | 6 | class BaseRequest(object):
|
8 |
| - TOKEN_ENDPOINT_PATH = 'oauth2/v2.0/token' |
9 | 7 |
|
10 | 8 | def __init__(
|
11 |
| - self, authority=None, token_cache=None, scope=None, policy="", |
| 9 | + self, authority=None, token_cache=None, |
| 10 | + scope=None, policy="", # TBD: If scope and policy are paramters |
| 11 | + # of both high level ClientApplication.acquire_token() |
| 12 | + # and low level oauth2.*Grant.get_token(), |
| 13 | + # shouldn't they be the parameters for run()? |
12 | 14 | client_id=None, client_credential=None, authenticator=None,
|
13 | 15 | support_adfs=False, restrict_to_single_user=False):
|
14 | 16 | if not scope:
|
15 | 17 | raise ValueError("scope cannot be empty")
|
16 | 18 | self.__dict__.update(locals())
|
17 | 19 |
|
| 20 | + # TODO: Temporary solution here |
| 21 | + self.token_endpoint = authority |
| 22 | + if authority.startswith('https://login.microsoftonline.com/common/'): |
| 23 | + self.token_endpoint += 'oauth2/v2.0/token' |
| 24 | + elif authority.startswith('https://login.windows.net/'): # AAD? |
| 25 | + self.token_endpoint += 'oauth2/token' |
| 26 | + if policy: |
| 27 | + self.token_endpoint += '?policy={}'.format(policy) |
| 28 | + |
18 | 29 | def run(self):
|
19 | 30 | """Returns a dictionary, which typically contains following keys:
|
20 | 31 |
|
@@ -55,54 +66,3 @@ def __timestamp(self, seconds_from_now=None): # Returns timestamp IN SECOND
|
55 | 66 | def get_token(self):
|
56 | 67 | raise NotImplemented("Use proper sub-class instead")
|
57 | 68 |
|
58 |
| - |
59 |
| -class ClientCredentialRequest(BaseRequest): |
60 |
| - def get_token(self): |
61 |
| - token_endpoint="%s%s?policy=%s" % ( |
62 |
| - self.authority, self.TOKEN_ENDPOINT_PATH, self.policy) |
63 |
| - if isinstance(self.client_credential, dict): # certification logic |
64 |
| - return ClientCredentialCertificateGrant( |
65 |
| - self.client_id, token_endpoint=token_endpoint |
66 |
| - ).get_token( |
67 |
| - self.client_credential['certificate'], |
68 |
| - self.client_credential['thumbprint'], |
69 |
| - scope=self.scope) |
70 |
| - else: |
71 |
| - return oauth2.ClientCredentialGrant( |
72 |
| - self.client_id, token_endpoint=token_endpoint |
73 |
| - ).get_token( |
74 |
| - scope=self.scope, client_secret=self.client_credential) |
75 |
| - |
76 |
| - |
77 |
| -import binascii |
78 |
| -import base64 |
79 |
| -import uuid |
80 |
| - |
81 |
| -import jwt |
82 |
| - |
83 |
| - |
84 |
| -def create(private_pem, thumbprint, audience, issuer, subject=None): |
85 |
| - assert private_pem.startswith('-----BEGIN PRIVATE KEY-----'), "Wrong format" |
86 |
| - payload = { # key names are all from JWT standard names |
87 |
| - 'aud': audience, |
88 |
| - 'iss': issuer, |
89 |
| - 'sub': subject or issuer, |
90 |
| - 'nbf': time.time(), |
91 |
| - 'exp': time.time() + 10*60, # 10 minutes |
92 |
| - 'jti': str(uuid.uuid4()), |
93 |
| - } |
94 |
| - # http://self-issued.info/docs/draft-jones-json-web-token-01.html |
95 |
| - h = {'x5t': base64.urlsafe_b64encode(binascii.a2b_hex(thumbprint)).decode()} |
96 |
| - return jwt.encode(payload, private_pem, algorithm='RS256', headers=h) # .decode() # TODO: Is the decode() really necessary? |
97 |
| - |
98 |
| - |
99 |
| -class ClientCredentialCertificateGrant(oauth2.ClientCredentialGrant): |
100 |
| - def get_token(self, pem, thumbprint, scope=None): |
101 |
| - JWT_BEARER = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer' |
102 |
| - assertion = create(pem, thumbprint, self.token_endpoint, self.client_id) |
103 |
| - import logging |
104 |
| - logging.warning('assertion: %s', assertion) |
105 |
| - return super(ClientCredentialCertificateGrant, self).get_token( |
106 |
| - client_assertion_type=JWT_BEARER, client_assertion=assertion, |
107 |
| - scope=scope) |
108 |
| - |
0 commit comments