18
18
import json
19
19
import logging
20
20
import time
21
+ import urllib
21
22
22
23
import jwt
23
24
import responses
24
25
25
26
from ibm_cloud_sdk_core import IAMAssumeTokenManager
27
+ from ibm_cloud_sdk_core .api_exception import ApiException
26
28
from .utils .logger_utils import setup_test_logger
27
29
28
30
29
- MY_PROFILE_ID = 'my-profile-id'
30
- MY_PROFILE_CRN = 'my-profile-crn'
31
- MY_PROFILE_NAME = 'my-profile-name'
32
- MY_ACCOUNT_ID = 'my-account_id'
33
-
34
-
35
31
setup_test_logger (logging .ERROR )
36
32
37
33
38
34
def _get_current_time () -> int :
39
35
return int (time .time ())
40
36
41
37
38
+ IAM_URL = "https://iam.cloud.ibm.com/identity/token"
39
+ MY_PROFILE_ID = 'my-profile-id'
40
+ MY_PROFILE_CRN = 'my-profile-crn'
41
+ MY_PROFILE_NAME = 'my-profile-name'
42
+ MY_ACCOUNT_ID = 'my-account_id'
43
+
44
+
45
+ # The base layout of an access token.
42
46
ACCESS_TOKEN_LAYOUT = {
43
47
"username" : "dummy" ,
44
48
"role" : "Admin" ,
@@ -50,33 +54,53 @@ def _get_current_time() -> int:
50
54
"iat" : _get_current_time (),
51
55
"exp" : _get_current_time () + 3600 ,
52
56
}
53
-
57
+ # Create two different access tokens by using different secrets for the encoding.
54
58
ACCESS_TOKEN = jwt .encode (
55
59
ACCESS_TOKEN_LAYOUT , 'secret' , algorithm = 'HS256' , headers = {'kid' : '230498151c214b788dd97f22b85410a5' }
56
60
)
61
+ OTHER_ACCESS_TOKEN = jwt .encode (
62
+ ACCESS_TOKEN_LAYOUT , 'other_secret' , algorithm = 'HS256' , headers = {'kid' : '230498151c214b788dd97f22b85410a5' }
63
+ )
64
+ # Create a base response and serialize it to a JSON string to avoid doing that in each test case.
65
+ BASE_RESPONSE = {
66
+ "access_token" : ACCESS_TOKEN ,
67
+ "token_type" : "Bearer" ,
68
+ "expires_in" : 3600 ,
69
+ "expiration" : _get_current_time () + 3600 ,
70
+ "refresh_token" : "not_available" ,
71
+ }
72
+ BASE_RESPONSE_JSON = json .dumps (BASE_RESPONSE )
73
+ # Create a second base response just like we did above, but use the other access token.
74
+ OTHER_BASE_RESPONSE = BASE_RESPONSE .copy ()
75
+ OTHER_BASE_RESPONSE ['access_token' ] = OTHER_ACCESS_TOKEN
76
+ OTHER_BASE_RESPONSE_JSON = json .dumps (OTHER_BASE_RESPONSE )
77
+
78
+
79
+ def request_callback (request ):
80
+ """Parse the form data, and return a response based on the `grant_type` value."""
81
+ form_data = urllib .parse .unquote (request .body )
82
+ params = dict (param .split ('=' ) for param in form_data .split ('&' ))
83
+ if params .get ('grant_type' ) == 'urn:ibm:params:oauth:grant-type:apikey' :
84
+ return (200 , {}, BASE_RESPONSE_JSON )
85
+ if params .get ('grant_type' ) == 'urn:ibm:params:oauth:grant-type:assume' :
86
+ return (200 , {}, OTHER_BASE_RESPONSE_JSON )
87
+
88
+ raise ApiException (400 )
57
89
58
90
59
91
@responses .activate
60
92
def test_request_token_with_profile_id ():
61
- iam_url = "https://iam.cloud.ibm.com/identity/token"
62
- response = {
63
- "access_token" : ACCESS_TOKEN ,
64
- "token_type" : "Bearer" ,
65
- "expires_in" : 3600 ,
66
- "expiration" : _get_current_time () + 3600 ,
67
- "refresh_token" : "jy4gl91BQ" ,
68
- }
69
- responses .add (responses .POST , url = iam_url , body = json .dumps (response ), status = 200 )
93
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
70
94
71
95
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = MY_PROFILE_ID )
72
96
73
97
# Make sure we don't have an access token yet.
74
- assert token_manager .request_payload .get ('access_token' , None ) is None
98
+ assert token_manager .request_payload .get ('access_token' ) is None
75
99
76
100
token_manager .request_token ()
77
101
78
102
# Now the access token should be set along with the profile ID.
79
- assert token_manager .request_payload .get ('access_token' ) is not None
103
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
80
104
assert token_manager .request_payload .get ('profile_id' ) == MY_PROFILE_ID
81
105
assert token_manager .request_payload .get ('profile_crn' ) is None
82
106
assert token_manager .request_payload .get ('profile_name' ) is None
@@ -85,25 +109,17 @@ def test_request_token_with_profile_id():
85
109
86
110
@responses .activate
87
111
def test_request_token_with_profile_crn ():
88
- iam_url = "https://iam.cloud.ibm.com/identity/token"
89
- response = {
90
- "access_token" : ACCESS_TOKEN ,
91
- "token_type" : "Bearer" ,
92
- "expires_in" : 3600 ,
93
- "expiration" : _get_current_time () + 3600 ,
94
- "refresh_token" : "jy4gl91BQ" ,
95
- }
96
- responses .add (responses .POST , url = iam_url , body = json .dumps (response ), status = 200 )
112
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
97
113
98
114
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_crn = MY_PROFILE_CRN )
99
115
100
116
# Make sure we don't have an access token yet.
101
- assert token_manager .request_payload .get ('access_token' , None ) is None
117
+ assert token_manager .request_payload .get ('access_token' ) is None
102
118
103
119
token_manager .request_token ()
104
120
105
121
# Now the access token should be set along with the profile ID.
106
- assert token_manager .request_payload .get ('access_token' ) is not None
122
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
107
123
assert token_manager .request_payload .get ('profile_id' ) is None
108
124
assert token_manager .request_payload .get ('profile_crn' ) == MY_PROFILE_CRN
109
125
assert token_manager .request_payload .get ('profile_name' ) is None
@@ -112,25 +128,17 @@ def test_request_token_with_profile_crn():
112
128
113
129
@responses .activate
114
130
def test_request_token_with_profile_name_and_account_id ():
115
- iam_url = "https://iam.cloud.ibm.com/identity/token"
116
- response = {
117
- "access_token" : ACCESS_TOKEN ,
118
- "token_type" : "Bearer" ,
119
- "expires_in" : 3600 ,
120
- "expiration" : _get_current_time () + 3600 ,
121
- "refresh_token" : "jy4gl91BQ" ,
122
- }
123
- responses .add (responses .POST , url = iam_url , body = json .dumps (response ), status = 200 )
131
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
124
132
125
133
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_name = MY_PROFILE_NAME , iam_account_id = MY_ACCOUNT_ID )
126
134
127
135
# Make sure we don't have an access token yet.
128
- assert token_manager .request_payload .get ('access_token' , None ) is None
136
+ assert token_manager .request_payload .get ('access_token' ) is None
129
137
130
138
token_manager .request_token ()
131
139
132
140
# Now the access token should be set along with the profile ID.
133
- assert token_manager .request_payload .get ('access_token' ) is not None
141
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
134
142
assert token_manager .request_payload .get ('profile_id' ) is None
135
143
assert token_manager .request_payload .get ('profile_crn' ) is None
136
144
assert token_manager .request_payload .get ('profile_name' ) == MY_PROFILE_NAME
@@ -140,15 +148,7 @@ def test_request_token_with_profile_name_and_account_id():
140
148
@responses .activate
141
149
def test_request_token_uses_the_correct_grant_types ():
142
150
iam_url = "https://iam.cloud.ibm.com/identity/token"
143
- response = {
144
- "access_token" : ACCESS_TOKEN ,
145
- "token_type" : "Bearer" ,
146
- "expires_in" : 3600 ,
147
- "expiration" : _get_current_time () + 3600 ,
148
- "refresh_token" : "jy4gl91BQ" ,
149
- }
150
- response_json = json .dumps (response )
151
- responses .add (responses .POST , url = iam_url , body = response_json , status = 200 )
151
+ responses .add (responses .POST , url = iam_url , body = BASE_RESPONSE_JSON , status = 200 )
152
152
153
153
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = 'my_profile_id' )
154
154
token_manager .request_token ()
@@ -159,16 +159,7 @@ def test_request_token_uses_the_correct_grant_types():
159
159
160
160
@responses .activate
161
161
def test_request_token_uses_the_correct_headers ():
162
- iam_url = "https://iam.cloud.ibm.com/identity/token"
163
- response = {
164
- "access_token" : ACCESS_TOKEN ,
165
- "token_type" : "Bearer" ,
166
- "expires_in" : 3600 ,
167
- "expiration" : _get_current_time () + 3600 ,
168
- "refresh_token" : "jy4gl91BQ" ,
169
- }
170
- response_json = json .dumps (response )
171
- responses .add (responses .POST , url = iam_url , body = response_json , status = 200 )
162
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
172
163
173
164
token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = 'my_profile_id' )
174
165
token_manager .request_token ()
@@ -178,3 +169,24 @@ def test_request_token_uses_the_correct_headers():
178
169
assert (
179
170
responses .calls [1 ].request .headers .get ('User-Agent' ).startswith ('ibm-python-sdk-core/iam-assume-authenticator' )
180
171
)
172
+
173
+
174
+ @responses .activate
175
+ def test_get_token ():
176
+ responses .add_callback (responses .POST , url = IAM_URL , callback = request_callback )
177
+
178
+ token_manager = IAMAssumeTokenManager ("apikey" , iam_profile_id = MY_PROFILE_ID )
179
+
180
+ # Make sure we don't have an access token yet.
181
+ assert token_manager .request_payload .get ('access_token' ) is None
182
+
183
+ access_token = token_manager .get_token ()
184
+ # Now the access token should be set along with the profile ID.
185
+ assert token_manager .request_payload .get ('access_token' ) == ACCESS_TOKEN
186
+ assert token_manager .request_payload .get ('profile_id' ) == MY_PROFILE_ID
187
+ assert token_manager .request_payload .get ('profile_crn' ) is None
188
+ assert token_manager .request_payload .get ('profile_name' ) is None
189
+ assert token_manager .request_payload .get ('account_id' ) is None
190
+
191
+ # The final result should be the other access token, which belong to the "assume" request.
192
+ assert access_token == OTHER_ACCESS_TOKEN
0 commit comments