18
18
import json
19
19
import logging
20
20
import time
21
+ import urllib
21
22
22
23
import jwt
23
24
import responses
26
27
from .utils .logger_utils import setup_test_logger
27
28
28
29
29
- MY_PROFILE_ID = 'my-profile-id'
30
- MY_PROFILE_CRN = 'my-profile-crn'
31
- MY_PROFILE_NAME = 'my-profile-name'
32
- MY_ACCOUNT_ID = 'my-account_id'
33
-
34
-
35
30
setup_test_logger (logging .ERROR )
36
31
37
32
38
33
def _get_current_time () -> int :
39
34
return int (time .time ())
40
35
41
36
37
+ IAM_URL = "https://iam.cloud.ibm.com/identity/token"
38
+ MY_PROFILE_ID = 'my-profile-id'
39
+ MY_PROFILE_CRN = 'my-profile-crn'
40
+ MY_PROFILE_NAME = 'my-profile-name'
41
+ MY_ACCOUNT_ID = 'my-account_id'
42
+
43
+
44
+ # The base layout of an access token.
42
45
ACCESS_TOKEN_LAYOUT = {
43
46
"username" : "dummy" ,
44
47
"role" : "Admin" ,
@@ -50,33 +53,53 @@ def _get_current_time() -> int:
50
53
"iat" : _get_current_time (),
51
54
"exp" : _get_current_time () + 3600 ,
52
55
}
53
-
56
+ # Create two different access tokens by using different secrets for the encoding.
54
57
ACCESS_TOKEN = jwt .encode (
55
58
ACCESS_TOKEN_LAYOUT , 'secret' , algorithm = 'HS256' , headers = {'kid' : '230498151c214b788dd97f22b85410a5' }
56
59
)
60
+ OTHER_ACCESS_TOKEN = jwt .encode (
61
+ ACCESS_TOKEN_LAYOUT , 'other_secret' , algorithm = 'HS256' , headers = {'kid' : '230498151c214b788dd97f22b85410a5' }
62
+ )
63
+ # Create a base response and serialize it to a JSON string to avoid doing that in each test case.
64
+ BASE_RESPONSE = {
65
+ "access_token" : ACCESS_TOKEN ,
66
+ "token_type" : "Bearer" ,
67
+ "expires_in" : 3600 ,
68
+ "expiration" : _get_current_time () + 3600 ,
69
+ "refresh_token" : "not_available" ,
70
+ }
71
+ BASE_RESPONSE_JSON = json .dumps (BASE_RESPONSE )
72
+ # Create a second base response just like we did above, but use the other access token.
73
+ OTHER_BASE_RESPONSE = BASE_RESPONSE .copy ()
74
+ OTHER_BASE_RESPONSE ['access_token' ] = OTHER_ACCESS_TOKEN
75
+ OTHER_BASE_RESPONSE_JSON = json .dumps (OTHER_BASE_RESPONSE )
76
+
77
+
78
+ def request_callback (request ):
79
+ """Parse the form data, and return a response based on the `grant_type` value."""
80
+ form_data = urllib .parse .unquote (request .body )
81
+ params = dict (param .split ('=' ) for param in form_data .split ('&' ))
82
+ if params .get ('grant_type' ) == 'urn:ibm:params:oauth:grant-type:apikey' :
83
+ return (200 , {}, BASE_RESPONSE_JSON )
84
+ if params .get ('grant_type' ) == 'urn:ibm:params:oauth:grant-type:assume' :
85
+ return (200 , {}, OTHER_BASE_RESPONSE_JSON )
86
+ else :
87
+ raise Exception ("Invalid request" )
57
88
58
89
59
90
@responses .activate
60
91
def test_request_token_with_profile_id ():
61
- iam_url = "https://iam.cloud.ibm.com/identity/token"
62
- response = {
63
- "access_token" : ACCESS_TOKEN ,
64
- "token_type" : "Bearer" ,
65
- "expires_in" : 3600 ,
66
- "expiration" : _get_current_time () + 3600 ,
67
- "refresh_token" : "jy4gl91BQ" ,
68
- }
69
- responses .add (responses .POST , url = iam_url , body = json .dumps (response ), status = 200 )
92
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
70
93
71
94
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = MY_PROFILE_ID )
72
95
73
96
# Make sure we don't have an access token yet.
74
- assert token_manager .request_payload .get ('access_token' , None ) is None
97
+ assert token_manager .request_payload .get ('access_token' ) is None
75
98
76
99
token_manager .request_token ()
77
100
78
101
# Now the access token should be set along with the profile ID.
79
- assert token_manager .request_payload .get ('access_token' ) is not None
102
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
80
103
assert token_manager .request_payload .get ('profile_id' ) == MY_PROFILE_ID
81
104
assert token_manager .request_payload .get ('profile_crn' ) is None
82
105
assert token_manager .request_payload .get ('profile_name' ) is None
@@ -85,25 +108,17 @@ def test_request_token_with_profile_id():
85
108
86
109
@responses .activate
87
110
def test_request_token_with_profile_crn ():
88
- iam_url = "https://iam.cloud.ibm.com/identity/token"
89
- response = {
90
- "access_token" : ACCESS_TOKEN ,
91
- "token_type" : "Bearer" ,
92
- "expires_in" : 3600 ,
93
- "expiration" : _get_current_time () + 3600 ,
94
- "refresh_token" : "jy4gl91BQ" ,
95
- }
96
- responses .add (responses .POST , url = iam_url , body = json .dumps (response ), status = 200 )
111
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
97
112
98
113
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_crn = MY_PROFILE_CRN )
99
114
100
115
# Make sure we don't have an access token yet.
101
- assert token_manager .request_payload .get ('access_token' , None ) is None
116
+ assert token_manager .request_payload .get ('access_token' ) is None
102
117
103
118
token_manager .request_token ()
104
119
105
120
# Now the access token should be set along with the profile ID.
106
- assert token_manager .request_payload .get ('access_token' ) is not None
121
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
107
122
assert token_manager .request_payload .get ('profile_id' ) is None
108
123
assert token_manager .request_payload .get ('profile_crn' ) == MY_PROFILE_CRN
109
124
assert token_manager .request_payload .get ('profile_name' ) is None
@@ -112,25 +127,17 @@ def test_request_token_with_profile_crn():
112
127
113
128
@responses .activate
114
129
def test_request_token_with_profile_name_and_account_id ():
115
- iam_url = "https://iam.cloud.ibm.com/identity/token"
116
- response = {
117
- "access_token" : ACCESS_TOKEN ,
118
- "token_type" : "Bearer" ,
119
- "expires_in" : 3600 ,
120
- "expiration" : _get_current_time () + 3600 ,
121
- "refresh_token" : "jy4gl91BQ" ,
122
- }
123
- responses .add (responses .POST , url = iam_url , body = json .dumps (response ), status = 200 )
130
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
124
131
125
132
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_name = MY_PROFILE_NAME , iam_account_id = MY_ACCOUNT_ID )
126
133
127
134
# Make sure we don't have an access token yet.
128
- assert token_manager .request_payload .get ('access_token' , None ) is None
135
+ assert token_manager .request_payload .get ('access_token' ) is None
129
136
130
137
token_manager .request_token ()
131
138
132
139
# Now the access token should be set along with the profile ID.
133
- assert token_manager .request_payload .get ('access_token' ) is not None
140
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
134
141
assert token_manager .request_payload .get ('profile_id' ) is None
135
142
assert token_manager .request_payload .get ('profile_crn' ) is None
136
143
assert token_manager .request_payload .get ('profile_name' ) == MY_PROFILE_NAME
@@ -140,15 +147,7 @@ def test_request_token_with_profile_name_and_account_id():
140
147
@responses .activate
141
148
def test_request_token_uses_the_correct_grant_types ():
142
149
iam_url = "https://iam.cloud.ibm.com/identity/token"
143
- response = {
144
- "access_token" : ACCESS_TOKEN ,
145
- "token_type" : "Bearer" ,
146
- "expires_in" : 3600 ,
147
- "expiration" : _get_current_time () + 3600 ,
148
- "refresh_token" : "jy4gl91BQ" ,
149
- }
150
- response_json = json .dumps (response )
151
- responses .add (responses .POST , url = iam_url , body = response_json , status = 200 )
150
+ responses .add (responses .POST , url = iam_url , body = BASE_RESPONSE_JSON , status = 200 )
152
151
153
152
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = 'my_profile_id' )
154
153
token_manager .request_token ()
@@ -159,16 +158,7 @@ def test_request_token_uses_the_correct_grant_types():
159
158
160
159
@responses .activate
161
160
def test_request_token_uses_the_correct_headers ():
162
- iam_url = "https://iam.cloud.ibm.com/identity/token"
163
- response = {
164
- "access_token" : ACCESS_TOKEN ,
165
- "token_type" : "Bearer" ,
166
- "expires_in" : 3600 ,
167
- "expiration" : _get_current_time () + 3600 ,
168
- "refresh_token" : "jy4gl91BQ" ,
169
- }
170
- response_json = json .dumps (response )
171
- responses .add (responses .POST , url = iam_url , body = response_json , status = 200 )
161
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
172
162
173
163
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = 'my_profile_id' )
174
164
token_manager .request_token ()
@@ -178,3 +168,24 @@ def test_request_token_uses_the_correct_headers():
178
168
assert (
179
169
responses .calls [1 ].request .headers .get ('User-Agent' ).startswith ('ibm-python-sdk-core/iam-assume-authenticator' )
180
170
)
171
+
172
+
173
+ @responses .activate
174
+ def test_get_token ():
175
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
176
+
177
+ token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = MY_PROFILE_ID )
178
+
179
+ # Make sure we don't have an access token yet.
180
+ assert token_manager .request_payload .get ('access_token' ) is None
181
+
182
+ access_token = token_manager .get_token ()
183
+ # Now the access token should be set along with the profile ID.
184
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
185
+ assert token_manager .request_payload .get ('profile_id' ) == MY_PROFILE_ID
186
+ assert token_manager .request_payload .get ('profile_crn' ) is None
187
+ assert token_manager .request_payload .get ('profile_name' ) is None
188
+ assert token_manager .request_payload .get ('account_id' ) is None
189
+
190
+ # The final result should be the other access token, which belong to the "assume" request.
191
+ assert access_token == OTHER_ACCESS_TOKEN
0 commit comments