24
24
from ibm_cloud_sdk_core import ApiException , VPCInstanceTokenManager
25
25
26
26
27
+ #pylint: disable=line-too-long
28
+ TEST_ACCESS_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6ImhlbGxvIiwicm9sZSI6InVzZXIiLCJwZXJtaXNzaW9ucyI6WyJhZG1pbmlzdHJhdG9yIiwiZGVwbG95bWVudF9hZG1pbiJdLCJzdWIiOiJoZWxsbyIsImlzcyI6IkpvaG4iLCJhdWQiOiJEU1giLCJ1aWQiOiI5OTkiLCJpYXQiOjE1NjAyNzcwNTEsImV4cCI6MTU2MDI4MTgxOSwianRpIjoiMDRkMjBiMjUtZWUyZC00MDBmLTg2MjMtOGNkODA3MGI1NDY4In0.cIodB4I6CCcX8vfIImz7Cytux3GpWyObt9Gkur5g1QI'
27
29
TEST_TOKEN = 'abc123'
28
30
TEST_IAM_TOKEN = 'iam-abc123'
29
31
TEST_IAM_PROFILE_CRN = 'crn:iam-profile:123'
@@ -85,6 +87,7 @@ def test_retrieve_instance_identity_token(caplog):
85
87
assert caplog .record_tuples [0 ][2 ] == 'Invoking VPC \' create_access_token\' operation: http://someurl.com/instance_identity/v1/token'
86
88
assert caplog .record_tuples [1 ][2 ] == 'Returned from VPC \' create_access_token\' operation."'
87
89
90
+
88
91
@responses .activate
89
92
def test_retrieve_instance_identity_token_failed (caplog ):
90
93
caplog .set_level (logging .DEBUG )
@@ -109,6 +112,7 @@ def test_retrieve_instance_identity_token_failed(caplog):
109
112
#pylint: disable=line-too-long
110
113
assert caplog .record_tuples [0 ][2 ] == 'Invoking VPC \' create_access_token\' operation: http://someurl.com/instance_identity/v1/token'
111
114
115
+
112
116
@responses .activate
113
117
def test_request_token_with_crn (caplog ):
114
118
caplog .set_level (logging .DEBUG )
@@ -176,7 +180,7 @@ def mock_retrieve_instance_identity_token():
176
180
177
181
178
182
@responses .activate
179
- def test_request_token_with_failed (caplog ):
183
+ def test_request_token_failed (caplog ):
180
184
caplog .set_level (logging .DEBUG )
181
185
182
186
token_manager = VPCInstanceTokenManager (
@@ -201,3 +205,31 @@ def mock_retrieve_instance_identity_token():
201
205
# Check the logs.
202
206
#pylint: disable=line-too-long
203
207
assert caplog .record_tuples [0 ][2 ] == 'Invoking VPC \' create_iam_token\' operation: http://169.254.169.254/instance_identity/v1/iam_token'
208
+
209
+
210
+ @responses .activate
211
+ def test_access_token ():
212
+ token_manager = VPCInstanceTokenManager (
213
+ iam_profile_id = TEST_IAM_PROFILE_ID ,
214
+ )
215
+
216
+ response_ii = {
217
+ 'access_token' : TEST_TOKEN ,
218
+ }
219
+ response_iam = {
220
+ 'access_token' : TEST_ACCESS_TOKEN ,
221
+ }
222
+
223
+ responses .add (responses .PUT , 'http://169.254.169.254/instance_identity/v1/token' ,
224
+ body = json .dumps (response_ii ), status = 200 )
225
+ responses .add (responses .POST , 'http://169.254.169.254/instance_identity/v1/iam_token' ,
226
+ body = json .dumps (response_iam ), status = 200 )
227
+
228
+ assert token_manager .access_token is None
229
+ assert token_manager .expire_time == 0
230
+ assert token_manager .refresh_time == 0
231
+
232
+ token_manager .get_token ()
233
+ assert token_manager .access_token == TEST_ACCESS_TOKEN
234
+ assert token_manager .expire_time > 0
235
+ assert token_manager .refresh_time > 0
0 commit comments