Skip to content

Commit 80006d7

Browse files
committed
test: cover the container auth related code
1 parent 0d9a1a0 commit 80006d7

File tree

5 files changed

+345
-0
lines changed

5 files changed

+345
-0
lines changed

resources/cr-token.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
cr-token-1
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
SERVICE_1_AUTH_TYPE=container
2+
SERVICE_1_CR_TOKEN_FILENAME=crtoken.txt
3+
SERVICE_1_IAM_PROFILE_NAME=iam-user1
4+
SERVICE_1_IAM_PROFILE_ID=iam-id1
5+
SERVICE_1_AUTH_URL=https://iamhost/iam/api
6+
SERVICE_1_SCOPE=scope1
7+
SERVICE_1_CLIENT_ID=iam-client1
8+
SERVICE_1_CLIENT_SECRET=iam-secret1
9+
SERVICE_1_AUTH_DISABLE_SSL=true

test/test_container_authenticator.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# pylint: disable=missing-docstring
2+
import pytest
3+
4+
from ibm_cloud_sdk_core.authenticators import ContainerAuthenticator
5+
6+
7+
def test_container_authenticator():
8+
authenticator = ContainerAuthenticator(iam_profile_name='iam-user-123')
9+
assert authenticator is not None
10+
assert authenticator.token_manager.client_id is None
11+
assert authenticator.token_manager.client_secret is None
12+
assert authenticator.token_manager.disable_ssl_verification is False
13+
assert authenticator.token_manager.headers is None
14+
assert authenticator.token_manager.proxies is None
15+
assert authenticator.token_manager.scope is None
16+
17+
authenticator.set_cr_token_filename('path/to/token')
18+
assert authenticator.token_manager.cr_token_filename == 'path/to/token'
19+
20+
authenticator.set_iam_profile_name('iam-user-123')
21+
assert authenticator.token_manager.iam_profile_name == 'iam-user-123'
22+
23+
authenticator.set_iam_profile_id('iam-id-123')
24+
assert authenticator.token_manager.iam_profile_id == 'iam-id-123'
25+
26+
authenticator.set_client_id_and_secret('tom', 'jerry')
27+
assert authenticator.token_manager.client_id == 'tom'
28+
assert authenticator.token_manager.client_secret == 'jerry'
29+
30+
authenticator.set_scope('scope1 scope2 scope3')
31+
assert authenticator.token_manager.scope == 'scope1 scope2 scope3'
32+
33+
with pytest.raises(TypeError) as err:
34+
authenticator.set_headers('dummy')
35+
assert str(err.value) == 'headers must be a dictionary'
36+
37+
authenticator.set_headers({'dummy': 'headers'})
38+
assert authenticator.token_manager.headers == {'dummy': 'headers'}
39+
40+
with pytest.raises(TypeError) as err:
41+
authenticator.set_proxies('dummy')
42+
assert str(err.value) == 'proxies must be a dictionary'
43+
44+
authenticator.set_proxies({'dummy': 'proxies'})
45+
assert authenticator.token_manager.proxies == {'dummy': 'proxies'}
46+
47+
48+
def test_container_authenticator_with_scope():
49+
authenticator = ContainerAuthenticator(iam_profile_name='iam-user-123', scope='scope1 scope2')
50+
assert authenticator is not None
51+
assert authenticator.token_manager.scope == 'scope1 scope2'
52+
53+
54+
def test_authenticator_validate_failed():
55+
with pytest.raises(ValueError) as err:
56+
ContainerAuthenticator(None)
57+
assert str(err.value) == 'At least one of iam_profile_name or iam_profile_id must be specified.'
58+
59+
with pytest.raises(ValueError) as err:
60+
ContainerAuthenticator(iam_profile_name='iam-user-123', client_id='my_client_id')
61+
assert str(
62+
err.value) == 'Both client_id and client_secret should be initialized.'
63+
64+
with pytest.raises(ValueError) as err:
65+
ContainerAuthenticator(iam_profile_name='iam-user-123', client_secret='my_client_secret')
66+
assert str(
67+
err.value) == 'Both client_id and client_secret should be initialized.'

test/test_container_token_manager.py

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
# pylint: disable=missing-docstring
2+
import json
3+
import time
4+
from urllib.parse import parse_qs
5+
6+
import responses
7+
import pytest
8+
9+
from ibm_cloud_sdk_core import ApiException, ContainerTokenManager
10+
from ibm_cloud_sdk_core.authenticators import ContainerAuthenticator
11+
12+
# pylint: disable=line-too-long
13+
TEST_ACCESS_TOKEN_1 = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6ImhlbGxvIiwicm9sZSI6InVzZXIiLCJwZXJtaXNzaW9ucyI6WyJhZG1pbmlzdHJhdG9yIiwiZGVwbG95bWVudF9hZG1pbiJdLCJzdWIiOiJoZWxsbyIsImlzcyI6IkpvaG4iLCJhdWQiOiJEU1giLCJ1aWQiOiI5OTkiLCJpYXQiOjE1NjAyNzcwNTEsImV4cCI6MTU2MDI4MTgxOSwianRpIjoiMDRkMjBiMjUtZWUyZC00MDBmLTg2MjMtOGNkODA3MGI1NDY4In0.cIodB4I6CCcX8vfIImz7Cytux3GpWyObt9Gkur5g1QI'
14+
TEST_ACCESS_TOKEN_2 = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiIsImtpZCI6IjIzMDQ5ODE1MWMyMTRiNzg4ZGQ5N2YyMmI4NTQxMGE1In0.eyJ1c2VybmFtZSI6ImR1bW15Iiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsIm1hbmFnZV9jYXRhbG9nIl0sInN1YiI6ImFkbWluIiwiaXNzIjoic3NzIiwiYXVkIjoic3NzIiwidWlkIjoic3NzIiwiaWF0IjozNjAwLCJleHAiOjE2MjgwMDcwODF9.zvUDpgqWIWs7S1CuKv40ERw1IZ5FqSFqQXsrwZJyfRM'
15+
TEST_REFRESH_TOKEN = 'Xj7Gle500MachEOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6ImhlbGxvIiwicm9sZSI6InVzZXIiLCJwZXJtaXNzaW9ucyI6WyJhZG1pbmlzdHJhdG9yIiwiZGVwbG95bWVudF9hZG1pbiJdLCJzdWIiOiJoZWxsbyIsImlzcyI6IkpvaG4iLCJhdWQiOiJEU1giLCJ1aWQiOiI5OTkiLCJpYXQiOjE1NjAyNzcwNTEsImV4cCI6MTU2MDI4MTgxOSwianRpIjoiMDRkMjBiMjUtZWUyZC00MDBmLTg2MjMtOGNkODA3MGI1NDY4In0.cIodB4I6CCcX8vfIImz7Cytux3GpWyObt9Gkur5g1QI'
16+
MOCK_CR_TOKEN_FILE = './resources/cr-token.txt'
17+
MOCK_IAM_PROFILE_NAME = 'iam-user-123'
18+
MOCK_CLIENT_ID = 'client-id-1'
19+
MOCK_CLIENT_SECRET = 'client-secret-1'
20+
21+
22+
def _get_current_time() -> int:
23+
return int(time.time())
24+
25+
26+
def mock_iam_response(func):
27+
"""This is decorator function which extends `responses.activate`.
28+
This sets up all the mock response stuffs.
29+
"""
30+
def callback(request):
31+
assert request.headers['Accept'] == 'application/json'
32+
assert request.headers['Content-Type'] == 'application/x-www-form-urlencoded'
33+
34+
payload = parse_qs(request.body)
35+
36+
assert payload['cr_token'][0] == 'cr-token-1'
37+
assert payload['grant_type'][0] == 'urn:ibm:params:oauth:grant-type:cr-token'
38+
assert payload['profile_name'][0] or payload['iam_profile_id']
39+
40+
status_code = 200
41+
42+
scope = payload.get('scope')[0] if payload.get('scope') else None
43+
if scope == 'send-second-token':
44+
access_token = TEST_ACCESS_TOKEN_2
45+
elif scope == 'status-bad-request':
46+
access_token = None
47+
status_code = 400
48+
elif scope == 'check-basic-auth':
49+
assert request.headers['Authorization'] == 'Basic Y2xpZW50LWlkLTE6Y2xpZW50LXNlY3JldC0x'
50+
access_token = TEST_ACCESS_TOKEN_1
51+
else:
52+
access_token = TEST_ACCESS_TOKEN_1
53+
54+
response = json.dumps({
55+
'access_token': access_token,
56+
'token_type': 'Bearer',
57+
'expires_in': 3600,
58+
'expiration': _get_current_time()+3600,
59+
'refresh_token': TEST_REFRESH_TOKEN,
60+
})
61+
62+
return (status_code, {}, response)
63+
64+
@responses.activate
65+
def wrapper():
66+
response = responses.CallbackResponse(
67+
method=responses.POST,
68+
url='https://iam.cloud.ibm.com/identity/token',
69+
callback=callback,
70+
)
71+
72+
responses.add(response)
73+
74+
func()
75+
76+
return wrapper
77+
78+
79+
@mock_iam_response
80+
def test_request_token_auth_default():
81+
iam_url = "https://iam.cloud.ibm.com/identity/token"
82+
83+
token_manager = ContainerTokenManager(
84+
cr_token_filename=MOCK_CR_TOKEN_FILE,
85+
iam_profile_name=MOCK_IAM_PROFILE_NAME,
86+
)
87+
token_manager.request_token()
88+
89+
assert len(responses.calls) == 1
90+
assert responses.calls[0].request.url == iam_url
91+
assert responses.calls[0].request.headers.get('Authorization') is None
92+
assert json.loads(responses.calls[0].response.text)['access_token'] == TEST_ACCESS_TOKEN_1
93+
94+
95+
@mock_iam_response
96+
def test_request_token_auth_in_ctor():
97+
default_auth_header = 'Basic Yng6Yng='
98+
token_manager = ContainerTokenManager(
99+
cr_token_filename=MOCK_CR_TOKEN_FILE,
100+
iam_profile_name=MOCK_IAM_PROFILE_NAME,
101+
client_id='foo',
102+
client_secret='bar')
103+
104+
token_manager.request_token()
105+
106+
assert len(responses.calls) == 1
107+
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
108+
assert json.loads(responses.calls[0].response.text)['access_token'] == TEST_ACCESS_TOKEN_1
109+
assert 'scope' not in responses.calls[0].response.request.body
110+
111+
112+
@mock_iam_response
113+
def test_request_token_auth_in_ctor_with_scope():
114+
default_auth_header = 'Basic Yng6Yng='
115+
token_manager = ContainerTokenManager(
116+
cr_token_filename=MOCK_CR_TOKEN_FILE,
117+
iam_profile_name=MOCK_IAM_PROFILE_NAME,
118+
client_id='foo',
119+
client_secret='bar',
120+
scope='john snow')
121+
122+
token_manager.request_token()
123+
124+
assert len(responses.calls) == 1
125+
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
126+
assert json.loads(responses.calls[0].response.text)['access_token'] == TEST_ACCESS_TOKEN_1
127+
assert 'scope=john+snow' in responses.calls[0].response.request.body
128+
129+
130+
def test_retrieve_cr_token_success():
131+
token_manager = ContainerTokenManager(
132+
cr_token_filename=MOCK_CR_TOKEN_FILE,
133+
)
134+
135+
cr_token = token_manager.retrieve_cr_token()
136+
137+
assert cr_token == 'cr-token-1'
138+
139+
140+
def test_retrieve_cr_token_success_fail():
141+
token_manager = ContainerTokenManager(
142+
cr_token_filename='bogus-cr-token-file',
143+
)
144+
145+
with pytest.raises(Exception) as err:
146+
token_manager.retrieve_cr_token()
147+
148+
assert str(err.value) == 'Unable to retrieve the CR token value from file bogus-cr-token-file: [Errno 2] No such file or directory: \'bogus-cr-token-file\''
149+
150+
151+
@mock_iam_response
152+
def test_get_token_success():
153+
token_manager = ContainerTokenManager(
154+
cr_token_filename=MOCK_CR_TOKEN_FILE,
155+
iam_profile_name=MOCK_IAM_PROFILE_NAME,
156+
)
157+
158+
cr_token = token_manager.access_token
159+
assert cr_token is None
160+
161+
cr_token = token_manager.get_token()
162+
assert cr_token == TEST_ACCESS_TOKEN_1
163+
assert token_manager.access_token == TEST_ACCESS_TOKEN_1
164+
165+
# Verify the token manager return the cached value.
166+
# Before we call the `get_token` again, set the expiration and time.
167+
# This is necessary because we are using a fix JWT response.
168+
token_manager.expire_time = _get_current_time() + 3600
169+
token_manager.refresh_time = _get_current_time() + 3600
170+
token_manager.set_scope('send-second-token')
171+
cr_token = token_manager.get_token()
172+
assert cr_token == TEST_ACCESS_TOKEN_1
173+
assert token_manager.access_token == TEST_ACCESS_TOKEN_1
174+
175+
# Force expiration to get the second token.
176+
token_manager.expire_time = _get_current_time() - 1
177+
cr_token = token_manager.get_token()
178+
assert cr_token == TEST_ACCESS_TOKEN_2
179+
assert token_manager.access_token == TEST_ACCESS_TOKEN_2
180+
181+
182+
@mock_iam_response
183+
def test_request_token_success():
184+
token_manager = ContainerTokenManager(
185+
cr_token_filename=MOCK_CR_TOKEN_FILE,
186+
iam_profile_name=MOCK_IAM_PROFILE_NAME,
187+
)
188+
189+
token_response = token_manager.request_token()
190+
assert token_response['access_token'] == TEST_ACCESS_TOKEN_1
191+
192+
193+
@mock_iam_response
194+
def test_authenticate_success():
195+
authenticator = ContainerAuthenticator(
196+
cr_token_filename=MOCK_CR_TOKEN_FILE,
197+
iam_profile_name='iam-user-123')
198+
199+
request = {'headers': {}}
200+
201+
authenticator.authenticate(request)
202+
assert request['headers']['Authorization'] == 'Bearer ' + TEST_ACCESS_TOKEN_1
203+
204+
# Verify the token manager return the cached value.
205+
# Before we call the `get_token` again, set the expiration and time.
206+
# This is necessary because we are using a fix JWT response.
207+
authenticator.token_manager.expire_time = _get_current_time() + 3600
208+
authenticator.token_manager.refresh_time = _get_current_time() + 3600
209+
authenticator.token_manager.set_scope('send-second-token')
210+
authenticator.authenticate(request)
211+
assert request['headers']['Authorization'] == 'Bearer ' + TEST_ACCESS_TOKEN_1
212+
213+
# Force expiration to get the second token.
214+
authenticator.token_manager.expire_time = _get_current_time() - 1
215+
authenticator.authenticate(request)
216+
assert request['headers']['Authorization'] == 'Bearer ' + TEST_ACCESS_TOKEN_2
217+
218+
219+
@mock_iam_response
220+
def test_authenticate_fail_no_cr_token():
221+
authenticator = ContainerAuthenticator(
222+
cr_token_filename='bogus-cr-token-file',
223+
iam_profile_name='iam-user-123',
224+
url='https://bogus.iam.endpoint')
225+
226+
request = {'headers': {}}
227+
228+
with pytest.raises(Exception) as err:
229+
authenticator.authenticate(request)
230+
231+
assert str(err.value) == 'Unable to retrieve the CR token value from file bogus-cr-token-file: [Errno 2] No such file or directory: \'bogus-cr-token-file\''
232+
233+
234+
@mock_iam_response
235+
def test_authenticate_fail_iam():
236+
authenticator = ContainerAuthenticator(
237+
cr_token_filename=MOCK_CR_TOKEN_FILE,
238+
iam_profile_name='iam-user-123',
239+
scope='status-bad-request')
240+
241+
request = {'headers': {}}
242+
243+
with pytest.raises(ApiException) as err:
244+
authenticator.authenticate(request)
245+
246+
assert str(err.value) == 'Error: Bad Request, Code: 400'
247+
248+
249+
@mock_iam_response
250+
def test_client_id_and_secret():
251+
token_manager = ContainerTokenManager(
252+
cr_token_filename=MOCK_CR_TOKEN_FILE,
253+
iam_profile_name=MOCK_IAM_PROFILE_NAME,
254+
)
255+
256+
token_manager.set_client_id_and_secret(MOCK_CLIENT_ID, MOCK_CLIENT_SECRET)
257+
token_manager.set_scope('check-basic-auth')
258+
access_token = token_manager.get_token()
259+
assert access_token == TEST_ACCESS_TOKEN_1

test/test_utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,15 @@ def test_get_authenticator_from_credential_file():
291291
assert authenticator.username == 'my_username'
292292
del os.environ['IBM_CREDENTIALS_FILE']
293293

294+
file_path = os.path.join(os.path.dirname(__file__),
295+
'../resources/ibm-credentials-container.env')
296+
os.environ['IBM_CREDENTIALS_FILE'] = file_path
297+
authenticator = get_authenticator_from_environment('service 1')
298+
assert authenticator is not None
299+
assert authenticator.token_manager.cr_token_filename == 'crtoken.txt'
300+
assert authenticator.token_manager.iam_profile_name == 'iam-user1'
301+
del os.environ['IBM_CREDENTIALS_FILE']
302+
294303
file_path = os.path.join(os.path.dirname(__file__),
295304
'../resources/ibm-credentials-cp4d.env')
296305
os.environ['IBM_CREDENTIALS_FILE'] = file_path

0 commit comments

Comments
 (0)