|
12 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | 13 | # See the License for the specific language governing permissions and
|
14 | 14 | # limitations under the License.
|
15 |
| - |
16 | 15 | """
|
17 |
| -Test the platform service Global Search API operations |
| 16 | +Integration Tests for GlobalSearchV2 |
18 | 17 | """
|
19 | 18 |
|
20 |
| -import unittest |
21 | 19 | import os
|
22 |
| -from ibm_platform_services import GlobalSearchV2 |
23 |
| -from ibm_cloud_sdk_core.authenticators import IAMAuthenticator |
24 |
| -from jproperties import Properties |
25 |
| - |
26 |
| -# Read config file |
27 |
| -configFile = 'ghost.env' |
28 |
| -config = {} |
29 |
| -configLoaded = None |
30 |
| - |
31 |
| -try: |
32 |
| - with open(configFile, "rb") as f: |
33 |
| - p = Properties() |
34 |
| - p.load(f, "utf-8") |
35 |
| - config['GST_API_URL'] = p['GST_API_URL'].data |
36 |
| - config['GST_TAGS_URL'] = p['GST_TAGS_URL'].data |
37 |
| - config['GST_RESOURCE_NAMES'] = p['GST_RESOURCE_NAMES'].data |
38 |
| - config['GST_IINTERNA_APIKEY'] = p['GST_IINTERNA_APIKEY'].data |
39 |
| - config['GST_IAM_URL'] = p['GST_IAM_URL'].data |
40 |
| - config['GST_QUERY'] = p['GST_QUERY'].data |
41 |
| - config['GST_RESOURCE_CRN'] = p['GST_RESOURCE_CRN'].data |
42 |
| - configLoaded = True |
43 |
| -except: |
44 |
| - print('External configuration was not found, skipping tests...') |
45 |
| - |
46 |
| -# Test class |
47 |
| -class TestGlobalSearchV2(unittest.TestCase): |
| 20 | +import pytest |
| 21 | +import uuid |
| 22 | +from ibm_cloud_sdk_core import * |
| 23 | +from ibm_platform_services.global_search_v2 import * |
| 24 | + |
| 25 | +# Config file name |
| 26 | +config_file = 'global_search.env' |
| 27 | + |
| 28 | +transaction_id = str(uuid.uuid4()) |
| 29 | + |
| 30 | + |
| 31 | +class TestGlobalSearchV2(): |
48 | 32 | """
|
49 | 33 | Integration Test Class for GlobalSearchV2
|
50 | 34 | """
|
| 35 | + @classmethod |
| 36 | + def setup_class(cls): |
| 37 | + if os.path.exists(config_file): |
| 38 | + os.environ['IBM_CREDENTIALS_FILE'] = config_file |
| 39 | + |
| 40 | + cls.global_search_service = GlobalSearchV2.new_instance() |
| 41 | + assert cls.global_search_service is not None |
| 42 | + |
| 43 | + print('Setup complete.') |
51 | 44 |
|
52 |
| - def setUp(self): |
53 |
| - if not configLoaded: |
54 |
| - self.skipTest("External configuration not available, skipping...") |
55 |
| - |
56 |
| - # Create authenticator with IAM API key (it generates bearer token automatically) |
57 |
| - apikey = config['GST_IINTERNA_APIKEY'] |
58 |
| - iam_url = config['GST_IAM_URL'] |
59 |
| - assert apikey is not None |
60 |
| - assert iam_url is not None |
61 |
| - authenticator = IAMAuthenticator(apikey, url=iam_url) |
62 |
| - |
63 |
| - self.global_search = GlobalSearchV2(authenticator=authenticator) |
64 |
| - self.global_search.set_service_url(config['GST_API_URL']) |
65 |
| - self.items = set(config['GST_RESOURCE_NAMES'].split(',')) |
66 |
| - |
67 |
| - def tearDown(self): |
68 |
| - # Delete the resources |
69 |
| - print("Clean up complete") |
70 |
| - |
71 |
| - def test_search_1(self): |
72 |
| - # It makes the query |
73 |
| - env = self.global_search.search(query='name:gst-sdk*', search_cursor=None, transaction_id=None) |
74 |
| - assert env is not None |
75 |
| - results = env.get_result() |
76 |
| - items = results.get('items') |
77 |
| - assert items is not None |
78 |
| - assert len(items) == 2 |
79 |
| - items_name_set = set() |
80 |
| - for item in items: |
81 |
| - items_name_set.add(item.get('name')) |
82 |
| - # It checks if the resultset and expected set are equal |
83 |
| - assert items_name_set == self.items |
84 |
| - |
85 |
| - def test_search_2(self): |
86 |
| - items_to_check = set(self.items) # Make a copy of the items in memory |
87 |
| - fields_to_search = ['crn', 'name'] |
88 |
| - # It makes the first query |
89 |
| - env = self.global_search.search(query='name:gst-sdk*', search_cursor=None, transaction_id=None, |
90 |
| - fields=fields_to_search, |
91 |
| - limit=1) |
92 |
| - assert env is not None |
93 |
| - results = env.get_result() |
94 |
| - items = results.get('items') |
95 |
| - assert items is not None |
96 |
| - assert len(items) == 1 |
97 |
| - items_to_check.remove(items[0]['name']) |
98 |
| - assert len(items_to_check) == 1 |
99 |
| - |
100 |
| - # It makes the second query with cursor |
101 |
| - search_cursor_str = results.get('search_cursor') |
102 |
| - env = self.global_search.search(query='name:gst-sdk*', search_cursor=search_cursor_str, transaction_id=None, |
103 |
| - fields=fields_to_search, |
104 |
| - limit=1) |
105 |
| - assert env is not None |
106 |
| - results = env.get_result() |
107 |
| - items = results.get('items') |
108 |
| - assert items is not None |
109 |
| - assert len(items) == 1 |
110 |
| - items_to_check.remove(items[0]['name']) |
111 |
| - assert len(items_to_check) == 0 |
| 45 | + needscredentials = pytest.mark.skipif( |
| 46 | + not os.path.exists(config_file), |
| 47 | + reason="External configuration not available, skipping...") |
112 | 48 |
|
| 49 | + @needscredentials |
| 50 | + def test_search(self): |
| 51 | + |
| 52 | + search_results = [] |
| 53 | + more_results = True |
| 54 | + search_cursor = None |
| 55 | + |
| 56 | + while more_results: |
| 57 | + search_response = self.global_search_service.search( |
| 58 | + query='GST-sdk-*', |
| 59 | + fields=['*'], |
| 60 | + search_cursor=search_cursor, |
| 61 | + transaction_id=transaction_id, |
| 62 | + limit=1) |
| 63 | + |
| 64 | + assert search_response.get_status_code() == 200 |
| 65 | + scan_result = search_response.get_result() |
| 66 | + assert scan_result is not None |
| 67 | + print('\nsearch() result: ', json.dumps(scan_result, indent=2)) |
| 68 | + |
| 69 | + if len(scan_result['items']) > 0: |
| 70 | + for elem in scan_result['items']: |
| 71 | + search_results.append(elem) |
| 72 | + search_cursor = scan_result['search_cursor'] |
| 73 | + else: |
| 74 | + more_results = False |
| 75 | + |
| 76 | + print('Total items returned by search(): ', len(search_results)) |
| 77 | + |
| 78 | + @needscredentials |
113 | 79 | def test_get_supported_types(self):
|
114 |
| - # It makes the query |
115 |
| - env = self.global_search.get_supported_types() |
116 |
| - assert env is not None |
117 |
| - results = env.get_result() |
118 |
| - supported_types = results.get('supported_types') |
119 |
| - assert supported_types is not None |
120 |
| - assert len(supported_types) > 0 |
121 | 80 |
|
| 81 | + get_supported_types_response = self.global_search_service.get_supported_types( |
| 82 | + ) |
| 83 | + |
| 84 | + assert get_supported_types_response.get_status_code() == 200 |
| 85 | + supported_types_list = get_supported_types_response.get_result() |
| 86 | + assert supported_types_list is not None |
122 | 87 |
|
123 |
| -if __name__ == '__main__': |
124 |
| - unittest.main() |
| 88 | + print('get_supported_types() result: ', |
| 89 | + json.dumps(supported_types_list, indent=2)) |
0 commit comments