@@ -58,9 +58,51 @@ def get_token(self):
58
58
59
59
class ClientCredentialRequest (BaseRequest ):
60
60
def get_token (self ):
61
- return oauth2 .ClientCredentialGrant (
62
- self .client_id ,
63
- token_endpoint = "%s%s?policy=%s" % (
64
- self .authority , self .TOKEN_ENDPOINT_PATH , self .policy ),
65
- ).get_token (scope = self .scope , client_secret = self .client_credential )
61
+ token_endpoint = "%s%s?policy=%s" % (
62
+ self .authority , self .TOKEN_ENDPOINT_PATH , self .policy )
63
+ if isinstance (self .client_credential , dict ): # certification logic
64
+ return ClientCredentialCertificateGrant (
65
+ self .client_id , token_endpoint = token_endpoint
66
+ ).get_token (
67
+ self .client_credential ['certificate' ],
68
+ self .client_credential ['thumbprint' ],
69
+ scope = self .scope )
70
+ else :
71
+ return oauth2 .ClientCredentialGrant (
72
+ self .client_id , token_endpoint = token_endpoint
73
+ ).get_token (
74
+ scope = self .scope , client_secret = self .client_credential )
75
+
76
+
77
+ import binascii
78
+ import base64
79
+ import uuid
80
+
81
+ import jwt
82
+
83
+
84
+ def create (private_pem , thumbprint , audience , issuer , subject = None ):
85
+ assert private_pem .startswith ('-----BEGIN PRIVATE KEY-----' ), "Wrong format"
86
+ payload = { # key names are all from JWT standard names
87
+ 'aud' : audience ,
88
+ 'iss' : issuer ,
89
+ 'sub' : subject or issuer ,
90
+ 'nbf' : time .time (),
91
+ 'exp' : time .time () + 10 * 60 , # 10 minutes
92
+ 'jti' : str (uuid .uuid4 ()),
93
+ }
94
+ # http://self-issued.info/docs/draft-jones-json-web-token-01.html
95
+ h = {'x5t' : base64 .urlsafe_b64encode (binascii .a2b_hex (thumbprint )).decode ()}
96
+ return jwt .encode (payload , private_pem , algorithm = 'RS256' , headers = h ) # .decode() # TODO: Is the decode() really necessary?
97
+
98
+
99
+ class ClientCredentialCertificateGrant (oauth2 .ClientCredentialGrant ):
100
+ def get_token (self , pem , thumbprint , scope = None ):
101
+ JWT_BEARER = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer'
102
+ assertion = create (pem , thumbprint , self .token_endpoint , self .client_id )
103
+ import logging
104
+ logging .warning ('assertion: %s' , assertion )
105
+ return super (ClientCredentialCertificateGrant , self ).get_token (
106
+ client_assertion_type = JWT_BEARER , client_assertion = assertion ,
107
+ scope = scope )
66
108
0 commit comments