Skip to content

feat(IAM Authenticator): add support for optional 'scope' property #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions ibm_cloud_sdk_core/authenticators/iam_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class IAMAuthenticator(Authenticator):
proxies: Dictionary for mapping request protocol to proxy URL. Defaults to None.
proxies.http (optional): The proxy endpoint to use for HTTP requests.
proxies.https (optional): The proxy endpoint to use for HTTPS requests.
scope: The "scope" to use when fetching the bearer token from the IAM token server.
This can be used to obtain an access token with a specific scope.

Attributes:
token_manager (IAMTokenManager): Retrives and manages IAM tokens from the endpoint specified by the url.
Expand All @@ -62,11 +64,12 @@ def __init__(self,
client_secret: Optional[str] = None,
disable_ssl_verification: bool = False,
headers: Optional[Dict[str, str]] = None,
proxies: Optional[Dict[str, str]] = None) -> None:
proxies: Optional[Dict[str, str]] = None,
scope: Optional[str] = None) -> None:
self.token_manager = IAMTokenManager(
apikey, url=url, client_id=client_id, client_secret=client_secret,
disable_ssl_verification=disable_ssl_verification,
headers=headers, proxies=proxies)
headers=headers, proxies=proxies, scope=scope)
self.validate()

def validate(self) -> None:
Expand Down Expand Up @@ -147,3 +150,12 @@ def set_proxies(self, proxies: Dict[str, str]) -> None:
proxies.https (optional): The proxy endpoint to use for HTTPS requests.
"""
self.token_manager.set_proxies(proxies)

def set_scope(self, value: str) -> None:
"""Sets the "scope" parameter to use when fetching the bearer token from the IAM token server.
This can be used to obtain an access token with a specific scope.

Args:
value: A space seperated string that makes up the scope parameter.
"""
self.token_manager.set_scope(value)
3 changes: 2 additions & 1 deletion ibm_cloud_sdk_core/get_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def __construct_authenticator(config: dict) -> Authenticator:
url=config.get('AUTH_URL'),
client_id=config.get('CLIENT_ID'),
client_secret=config.get('CLIENT_SECRET'),
disable_ssl_verification=config.get('AUTH_DISABLE_SSL'))
disable_ssl_verification=config.get('AUTH_DISABLE_SSL'),
scope=config.get('SCOPE'))
elif auth_type == 'noauth':
authenticator = NoAuthAuthenticator()

Expand Down
20 changes: 19 additions & 1 deletion ibm_cloud_sdk_core/iam_token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class IAMTokenManager(JWTTokenManager):
proxies.http (str): The proxy endpoint to use for HTTP requests.
proxies.https (str): The proxy endpoint to use for HTTPS requests.
http_config (dict): A dictionary containing values that control the timeout, proxies, and etc of HTTP requests.
scope (str): The "scope" to use when fetching the bearer token from the IAM token server.
This can be used to obtain an access token with a specific scope.

Args:
apikey: A generated APIKey from ibmcloud.
Expand All @@ -54,12 +56,15 @@ class IAMTokenManager(JWTTokenManager):
proxies: Proxies to use for communicating with IAM. Defaults to None.
proxies.http: The proxy endpoint to use for HTTP requests.
proxies.https: The proxy endpoint to use for HTTPS requests.
scope: The "scope" to use when fetching the bearer token from the IAM token server.
This can be used to obtain an access token with a specific scope.
"""
DEFAULT_IAM_URL = 'https://iam.cloud.ibm.com/identity/token'
CONTENT_TYPE = 'application/x-www-form-urlencoded'
REQUEST_TOKEN_GRANT_TYPE = 'urn:ibm:params:oauth:grant-type:apikey'
REQUEST_TOKEN_RESPONSE_TYPE = 'cloud_iam'
TOKEN_NAME = 'access_token'
SCOPE = 'scope'

def __init__(self,
apikey: str,
Expand All @@ -69,13 +74,15 @@ def __init__(self,
client_secret: Optional[str] = None,
disable_ssl_verification: bool = False,
headers: Optional[Dict[str, str]] = None,
proxies: Optional[Dict[str, str]] = None) -> None:
proxies: Optional[Dict[str, str]] = None,
scope: Optional[str] = None) -> None:
self.apikey = apikey
self.url = url if url else self.DEFAULT_IAM_URL
self.client_id = client_id
self.client_secret = client_secret
self.headers = headers
self.proxies = proxies
self.scope = scope
super().__init__(
self.url, disable_ssl_verification=disable_ssl_verification, token_name=self.TOKEN_NAME)

Expand All @@ -101,6 +108,9 @@ def request_token(self) -> dict:
'response_type': self.REQUEST_TOKEN_RESPONSE_TYPE
}

if self.scope is not None and self.scope:
data[self.SCOPE] = self.scope

auth_tuple = None
# If both the client_id and secret were specified by the user, then use them
if self.client_id and self.client_secret:
Expand Down Expand Up @@ -148,3 +158,11 @@ def set_proxies(self, proxies: Dict[str, str]) -> None:
self.proxies = proxies
else:
raise TypeError('proxies must be a dictionary')

def set_scope(self, value: str) -> None:
"""Sets the "scope" parameter to use when fetching the bearer token from the IAM token server.

Args:
value: A space seperated string that makes up the scope parameter.
"""
self.scope = value
11 changes: 10 additions & 1 deletion resources/ibm-credentials.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,13 @@ SERVICE_1_APIKEY=V4HXmoUtMjohnsnow=KotN
SERVICE_1_CLIENT_ID=somefake========id
SERVICE_1_CLIENT_SECRET===my-client-secret==
SERVICE_1_AUTH_URL=https://iamhost/iam/api=
SERVICE_1_URL=service1.com/api
SERVICE_1_URL=service1.com/api

# Service2 configured with IAM w/scope
SERVICE_2_AUTH_TYPE=iam
SERVICE_2_APIKEY=V4HXmoUtMjohnsnow=KotN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter what @dpopp07 says, I'm ok with you spelling this "john" instead of "jon" :)

SERVICE_2_CLIENT_ID=somefake========id
SERVICE_2_CLIENT_SECRET===my-client-secret==
SERVICE_2_AUTH_URL=https://iamhost/iam/api=
SERVICE_2_URL=service1.com/api
SERVICE_2_SCOPE=A B C D
11 changes: 10 additions & 1 deletion test/test_iam_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def test_iam_authenticator():
authenticator = IAMAuthenticator('my_apikey')
authenticator = IAMAuthenticator(apikey='my_apikey')
assert authenticator is not None
assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com/identity/token'
assert authenticator.token_manager.client_id is None
Expand All @@ -18,11 +18,15 @@ def test_iam_authenticator():
assert authenticator.token_manager.headers is None
assert authenticator.token_manager.proxies is None
assert authenticator.token_manager.apikey == 'my_apikey'
assert authenticator.token_manager.scope is None

authenticator.set_client_id_and_secret('tom', 'jerry')
assert authenticator.token_manager.client_id == 'tom'
assert authenticator.token_manager.client_secret == 'jerry'

authenticator.set_scope('scope1 scope2 scope3')
assert authenticator.token_manager.scope == 'scope1 scope2 scope3'

with pytest.raises(TypeError) as err:
authenticator.set_headers('dummy')
assert str(err.value) == 'headers must be a dictionary'
Expand All @@ -37,6 +41,11 @@ def test_iam_authenticator():
authenticator.set_proxies({'dummy': 'proxies'})
assert authenticator.token_manager.proxies == {'dummy': 'proxies'}

def test_iam_authenticator_with_scope():
authenticator = IAMAuthenticator(apikey='my_apikey', scope='scope1 scope2')
assert authenticator is not None
assert authenticator.token_manager.scope == 'scope1 scope2'


def test_iam_authenticator_validate_failed():
with pytest.raises(ValueError) as err:
Expand Down
52 changes: 52 additions & 0 deletions test/test_iam_token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,29 @@ def test_request_token_auth_in_ctor():
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body

@responses.activate
def test_request_token_auth_in_ctor_with_scope():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
default_auth_header = 'Basic Yng6Yng='
responses.add(responses.POST, url=iam_url, body=response, status=200)

token_manager = IAMTokenManager("apikey", url=iam_url, client_id='foo', client_secret='bar', scope='john snow')
token_manager.request_token()

assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
assert responses.calls[0].response.text == response
assert 'scope=john+snow' in responses.calls[0].response.request.body

@responses.activate
def test_request_token_unsuccessful():
Expand Down Expand Up @@ -119,6 +142,7 @@ def test_request_token_auth_in_ctor_client_id_only():
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body

@responses.activate
def test_request_token_auth_in_ctor_secret_only():
Expand All @@ -139,6 +163,7 @@ def test_request_token_auth_in_ctor_secret_only():
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body

@responses.activate
def test_request_token_auth_in_setter():
Expand All @@ -161,6 +186,7 @@ def test_request_token_auth_in_setter():
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body

@responses.activate
def test_request_token_auth_in_setter_client_id_only():
Expand All @@ -182,6 +208,7 @@ def test_request_token_auth_in_setter_client_id_only():
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body

@responses.activate
def test_request_token_auth_in_setter_secret_only():
Expand All @@ -204,3 +231,28 @@ def test_request_token_auth_in_setter_secret_only():
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body

@responses.activate
def test_request_token_auth_in_setter_scope():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)

token_manager = IAMTokenManager("iam_apikey")
token_manager.set_client_id_and_secret(None, 'bar')
token_manager.set_headers({'user':'header'})
token_manager.set_scope('john snow')
token_manager.request_token()

assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope=john+snow' in responses.calls[0].response.request.body
23 changes: 23 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ def test_get_authenticator_from_credential_file():
assert authenticator.token_manager.client_id == 'somefake========id'
assert authenticator.token_manager.client_secret == '==my-client-secret=='
assert authenticator.token_manager.url == 'https://iamhost/iam/api='
assert authenticator.token_manager.scope is None
del os.environ['IBM_CREDENTIALS_FILE']

def test_get_authenticator_from_credential_file_scope():
file_path = os.path.join(
os.path.dirname(__file__), '../resources/ibm-credentials.env')
os.environ['IBM_CREDENTIALS_FILE'] = file_path
authenticator = get_authenticator_from_environment('service_2')
assert authenticator is not None
assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN'
assert authenticator.token_manager.client_id == 'somefake========id'
assert authenticator.token_manager.client_secret == '==my-client-secret=='
assert authenticator.token_manager.url == 'https://iamhost/iam/api='
assert authenticator.token_manager.scope == 'A B C D'
del os.environ['IBM_CREDENTIALS_FILE']

def test_get_authenticator_from_env_variables():
Expand All @@ -160,6 +174,15 @@ def test_get_authenticator_from_env_variables():
assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN'
del os.environ['SERVICE_1_APIKEY']

os.environ['SERVICE_2_APIKEY'] = 'johnsnow'
os.environ['SERVICE_2_SCOPE'] = 'A B C D'
authenticator = get_authenticator_from_environment('service_2')
assert authenticator is not None
assert authenticator.token_manager.apikey == 'johnsnow'
assert authenticator.token_manager.scope == 'A B C D'
del os.environ['SERVICE_2_APIKEY']
del os.environ['SERVICE_2_SCOPE']

def test_vcap_credentials():
vcap_services = '{"test":[{"credentials":{ \
"url":"https://gateway.watsonplatform.net/compare-comply/api",\
Expand Down