Skip to content

Commit 7fcf4f2

Browse files
committed
refactor: shared token manager class with the IAM related functions
1 parent 814a596 commit 7fcf4f2

File tree

2 files changed

+178
-109
lines changed

2 files changed

+178
-109
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# coding: utf-8
2+
3+
# Copyright 2019 IBM All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from typing import Dict, Optional
18+
19+
from .jwt_token_manager import JWTTokenManager
20+
21+
class IAMRequestBasedTokenManager(JWTTokenManager):
22+
"""The IamRequestBasedTokenManager class contains code relevant to any token manager that
23+
interacts with the IAM service to manage a token. It stores information relevant to all
24+
IAM requests, such as the client ID and secret, and performs the token request with a set
25+
of request options common to any IAM token management scheme.
26+
27+
If the current stored bearer token has expired a new bearer token will be retrieved.
28+
29+
Attributes:
30+
request_payload(dict): the data that will be sent in the IAM OAuth token request
31+
url (str): The IAM endpoint to token requests.
32+
client_id (str): The client_id and client_secret fields are used to form
33+
a "basic auth" Authorization header for interactions with the IAM token server.
34+
client_secret (str): The client_id and client_secret fields are used to form
35+
a "basic auth" Authorization header for interactions with the IAM token server.
36+
headers (dict): Default headers to be sent with every IAM token request.
37+
proxies (dict): Proxies to use for communicating with IAM.
38+
proxies.http (str): The proxy endpoint to use for HTTP requests.
39+
proxies.https (str): The proxy endpoint to use for HTTPS requests.
40+
http_config (dict): A dictionary containing values that control the timeout, proxies, and etc of HTTP requests.
41+
scope (str): The "scope" to use when fetching the bearer token from the IAM token server.
42+
This can be used to obtain an access token with a specific scope.
43+
44+
Keyword Args:
45+
url: The IAM endpoint to token requests. Defaults to None.
46+
client_id: The client_id and client_secret fields are used to form
47+
a "basic auth" Authorization header for interactions with the IAM token server.
48+
Defaults to None.
49+
client_secret: The client_id and client_secret fields are used to form
50+
a "basic auth" Authorization header for interactions with the IAM token server.
51+
Defaults to None.
52+
disable_ssl_verification: A flag that indicates whether verification of
53+
the server's SSL certificate should be disabled or not. Defaults to False.
54+
headers: Default headers to be sent with every IAM token request. Defaults to None.
55+
proxies: Proxies to use for communicating with IAM. Defaults to None.
56+
proxies.http: The proxy endpoint to use for HTTP requests.
57+
proxies.https: The proxy endpoint to use for HTTPS requests.
58+
scope: The "scope" to use when fetching the bearer token from the IAM token server.
59+
This can be used to obtain an access token with a specific scope.
60+
"""
61+
DEFAULT_IAM_URL = 'https://iam.cloud.ibm.com'
62+
OPERATION_PATH = "/identity/token"
63+
64+
request_payload = {}
65+
66+
def __init__(self,
67+
url: Optional[str] = None,
68+
client_id: Optional[str] = None,
69+
client_secret: Optional[str] = None,
70+
disable_ssl_verification: bool = False,
71+
headers: Optional[Dict[str, str]] = None,
72+
proxies: Optional[Dict[str, str]] = None,
73+
scope: Optional[str] = None) -> None:
74+
if not url:
75+
url = self.DEFAULT_IAM_URL
76+
if url.endswith(self.OPERATION_PATH):
77+
url = url[:-len(self.OPERATION_PATH)]
78+
self.url = url
79+
self.client_id = client_id
80+
self.client_secret = client_secret
81+
self.headers = headers
82+
self.refresh_token = None
83+
self.proxies = proxies
84+
self.scope = scope
85+
super().__init__(
86+
self.url, disable_ssl_verification=disable_ssl_verification, token_name='access_token')
87+
88+
def request_token(self) -> dict:
89+
"""Request an IAM OAuth token given an API Key.
90+
91+
If client_id and client_secret are specified use their values as a user and pass auth set
92+
according to WHATWG url spec.
93+
94+
Returns:
95+
A dictionary containing the bearer token to be subsequently used service requests.
96+
"""
97+
headers = {
98+
'Content-type': 'applicaton/x-www-form-urlencoded',
99+
'Accept': 'application/json'
100+
}
101+
if self.headers is not None and isinstance(self.headers, dict):
102+
headers.update(self.headers)
103+
104+
data = dict(self.request_payload)
105+
106+
if self.scope is not None and self.scope:
107+
data['scope'] = self.scope
108+
109+
auth_tuple = None
110+
# If both the client_id and secret were specified by the user, then use them
111+
if self.client_id and self.client_secret:
112+
auth_tuple = (self.client_id, self.client_secret)
113+
114+
response = self._request(
115+
method='POST',
116+
url=(self.url + self.OPERATION_PATH) if self.url else self.url,
117+
headers=headers,
118+
data=data,
119+
auth_tuple=auth_tuple,
120+
proxies=self.proxies)
121+
return response
122+
123+
def set_client_id_and_secret(self, client_id: str, client_secret: str) -> None:
124+
"""Set the client_id and client_secret.
125+
126+
Args:
127+
client_id: The client id to be used for token requests.
128+
client_secret: The client secret to be used for token requests.
129+
"""
130+
self.client_id = client_id
131+
self.client_secret = client_secret
132+
133+
def set_headers(self, headers: Dict[str, str]) -> None:
134+
"""Headers to be sent with every CP4D token request.
135+
136+
Args:
137+
headers: Headers to be sent with every IAM token request.
138+
"""
139+
if isinstance(headers, dict):
140+
self.headers = headers
141+
else:
142+
raise TypeError('headers must be a dictionary')
143+
144+
def _save_token_info(self, token_response: dict) -> None:
145+
super()._save_token_info(token_response)
146+
147+
self.refresh_token = token_response.get("refresh_token")
148+
149+
def set_proxies(self, proxies: Dict[str, str]) -> None:
150+
"""Sets the proxies the token manager will use to communicate with IAM on behalf of the host.
151+
152+
Args:
153+
proxies: Proxies to use for communicating with IAM.
154+
proxies.http (str, optional): The proxy endpoint to use for HTTP requests.
155+
proxies.https (str, optional): The proxy endpoint to use for HTTPS requests.
156+
"""
157+
if isinstance(proxies, dict):
158+
self.proxies = proxies
159+
else:
160+
raise TypeError('proxies must be a dictionary')
161+
162+
def set_scope(self, value: str) -> None:
163+
"""Sets the "scope" parameter to use when fetching the bearer token from the IAM token server.
164+
165+
Args:
166+
value: A space seperated string that makes up the scope parameter.
167+
"""
168+
self.scope = value

ibm_cloud_sdk_core/iam_token_manager.py

Lines changed: 10 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
# limitations under the License.
1616

1717
from typing import Dict, Optional
18-
from .jwt_token_manager import JWTTokenManager
1918

20-
# pylint: disable=too-many-instance-attributes
21-
class IAMTokenManager(JWTTokenManager):
19+
from .iam_request_based_token_manager import IAMRequestBasedTokenManager
20+
21+
class IAMTokenManager(IAMRequestBasedTokenManager):
2222
"""The IAMTokenManager takes an api key and performs the necessary interactions with
2323
the IAM token service to obtain and store a suitable bearer token. Additionally, the IAMTokenManager
24-
will retrieve bearer tokens via basic auth using a supplied client_id and client_secret pair.
25-
2624
If the current stored bearer token has expired a new bearer token will be retrieved.
2725
2826
Attributes:
@@ -60,13 +58,6 @@ class IAMTokenManager(JWTTokenManager):
6058
scope: The "scope" to use when fetching the bearer token from the IAM token server.
6159
This can be used to obtain an access token with a specific scope.
6260
"""
63-
DEFAULT_IAM_URL = 'https://iam.cloud.ibm.com'
64-
CONTENT_TYPE = 'application/x-www-form-urlencoded'
65-
OPERATION_PATH = "/identity/token"
66-
REQUEST_TOKEN_GRANT_TYPE = 'urn:ibm:params:oauth:grant-type:apikey'
67-
REQUEST_TOKEN_RESPONSE_TYPE = 'cloud_iam'
68-
TOKEN_NAME = 'access_token'
69-
SCOPE = 'scope'
7061

7162
def __init__(self,
7263
apikey: str,
@@ -78,103 +69,13 @@ def __init__(self,
7869
headers: Optional[Dict[str, str]] = None,
7970
proxies: Optional[Dict[str, str]] = None,
8071
scope: Optional[str] = None) -> None:
81-
self.apikey = apikey
82-
if not url:
83-
url = self.DEFAULT_IAM_URL
84-
if url.endswith(self.OPERATION_PATH):
85-
url = url[:-len(self.OPERATION_PATH)]
86-
self.url = url
87-
self.client_id = client_id
88-
self.client_secret = client_secret
89-
self.headers = headers
90-
self.refresh_token = None
91-
self.proxies = proxies
92-
self.scope = scope
9372
super().__init__(
94-
self.url, disable_ssl_verification=disable_ssl_verification, token_name=self.TOKEN_NAME)
95-
96-
def request_token(self) -> dict:
97-
"""Request an IAM OAuth token given an API Key.
98-
99-
If client_id and client_secret are specified use their values as a user and pass auth set
100-
according to WHATWG url spec.
101-
102-
Returns:
103-
A dictionary containing the bearer token to be subsequently used service requests.
104-
"""
105-
headers = {
106-
'Content-type': self.CONTENT_TYPE,
107-
'Accept': 'application/json'
108-
}
109-
if self.headers is not None and isinstance(self.headers, dict):
110-
headers.update(self.headers)
111-
112-
data = {
113-
'grant_type': self.REQUEST_TOKEN_GRANT_TYPE,
114-
'apikey': self.apikey,
115-
'response_type': self.REQUEST_TOKEN_RESPONSE_TYPE
116-
}
117-
118-
if self.scope is not None and self.scope:
119-
data[self.SCOPE] = self.scope
120-
121-
auth_tuple = None
122-
# If both the client_id and secret were specified by the user, then use them
123-
if self.client_id and self.client_secret:
124-
auth_tuple = (self.client_id, self.client_secret)
125-
126-
response = self._request(
127-
method='POST',
128-
url=(self.url + self.OPERATION_PATH) if self.url else self.url,
129-
headers=headers,
130-
data=data,
131-
auth_tuple=auth_tuple,
132-
proxies=self.proxies)
133-
return response
73+
url=url, client_id=client_id, client_secret=client_secret,
74+
disable_ssl_verification=disable_ssl_verification, headers=headers, proxies=proxies, scope=scope)
13475

135-
def set_client_id_and_secret(self, client_id: str, client_secret: str) -> None:
136-
"""Set the client_id and client_secret.
137-
138-
Args:
139-
client_id: The client id to be used for token requests.
140-
client_secret: The client secret to be used for token requests.
141-
"""
142-
self.client_id = client_id
143-
self.client_secret = client_secret
144-
145-
def set_headers(self, headers: Dict[str, str]) -> None:
146-
"""Headers to be sent with every CP4D token request.
147-
148-
Args:
149-
headers: Headers to be sent with every IAM token request.
150-
"""
151-
if isinstance(headers, dict):
152-
self.headers = headers
153-
else:
154-
raise TypeError('headers must be a dictionary')
155-
156-
def _save_token_info(self, token_response: dict) -> None:
157-
super()._save_token_info(token_response)
158-
159-
self.refresh_token = token_response.get("refresh_token")
160-
161-
def set_proxies(self, proxies: Dict[str, str]) -> None:
162-
"""Sets the proxies the token manager will use to communicate with IAM on behalf of the host.
163-
164-
Args:
165-
proxies: Proxies to use for communicating with IAM.
166-
proxies.http (str, optional): The proxy endpoint to use for HTTP requests.
167-
proxies.https (str, optional): The proxy endpoint to use for HTTPS requests.
168-
"""
169-
if isinstance(proxies, dict):
170-
self.proxies = proxies
171-
else:
172-
raise TypeError('proxies must be a dictionary')
173-
174-
def set_scope(self, value: str) -> None:
175-
"""Sets the "scope" parameter to use when fetching the bearer token from the IAM token server.
76+
self.apikey = apikey
17677

177-
Args:
178-
value: A space seperated string that makes up the scope parameter.
179-
"""
180-
self.scope = value
78+
# Set API key related data.
79+
self.request_payload['grant_type'] = 'urn:ibm:params:oauth:grant-type:apikey'
80+
self.request_payload['apikey'] = self.apikey
81+
self.request_payload['response_type'] = 'cloud_iam'

0 commit comments

Comments
 (0)