Skip to content

Commit cbebcaa

Browse files
authored
Replace aioboto3 with boto3 (aws#80)
* Remove aioboto3 * Make sync the main handler * Remove aioboto3 completely, add tests * Remove perf test from pytest * Update setup.py * Import aioboto3 and update async handler to handle range with no length * Add s3transfer integration * Add benchmark numbers * Use multiprocessing * Benchmark with and without multiprocessing * remove results of perf test * Convert class to static methods, so it works with py3.6 * Remove async handler * Add perf test back for now * Use multiprocessing spawn on Mac * Only use MP if len(object_requests)>=100 * Make path a kwarg * Change multiprocessing multiple to 2 * Rename tornasole to smdebug-testing * Fix delete test
1 parent 4cfc85e commit cbebcaa

File tree

8 files changed

+306
-398
lines changed

8 files changed

+306
-398
lines changed

setup.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,7 @@
1010
CURRENT_VERSION = __version__
1111
FRAMEWORKS = ["tensorflow", "pytorch", "mxnet", "xgboost"]
1212
TESTS_PACKAGES = ["pytest", "torchvision", "pandas"]
13-
INSTALL_REQUIRES = [
14-
# aiboto3 implicitly depends on aiobotocore
15-
"aioboto3==6.4.1", # no version deps
16-
"aiobotocore==0.11.0", # pinned to a specific botocore & boto3
17-
"aiohttp>=3.6.0,<4.0", # aiobotocore breaks with 4.0
18-
# boto3 explicitly depends on botocore
19-
"boto3>=1.10.32", # Sagemaker requires >= 1.9.213
20-
"botocore>=1.13.32",
21-
"nest_asyncio",
22-
"protobuf>=3.6.0",
23-
"numpy",
24-
"packaging",
25-
]
13+
INSTALL_REQUIRES = ["protobuf>=3.6.0", "numpy", "packaging", "boto3>=1.10.32"]
2614

2715

2816
def compile_summary_protobuf():

smdebug/core/access_layer/s3handler.py

Lines changed: 190 additions & 191 deletions
Large diffs are not rendered by default.

smdebug/core/access_layer/utils.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
# Standard Library
2-
import asyncio
32
import os
43

54
# Third Party
6-
import aioboto3
75
from botocore.exceptions import ClientError
86

97
# First Party
10-
from smdebug.core.access_layer.s3handler import ListRequest, S3Handler
8+
from smdebug.core.access_layer.s3handler import DeleteRequest, ListRequest, S3Handler
119
from smdebug.core.logger import get_logger
1210
from smdebug.core.sagemaker_utils import is_sagemaker_job
13-
from smdebug.core.utils import get_region, is_s3
11+
from smdebug.core.utils import is_s3
1412

1513
# Local
1614
from .file import TSAccessFile
@@ -55,9 +53,8 @@ def has_training_ended(trial_prefix):
5553
s3, bucket_name, key_name = is_s3(file_path)
5654
if s3:
5755
try:
58-
s3_handler = S3Handler()
5956
request = ListRequest(bucket_name, key_name)
60-
file_available = s3_handler.list_prefixes([request])[0]
57+
file_available = S3Handler.list_prefixes([request])[0]
6158
if len(file_available) > 0:
6259
return True
6360
else:
@@ -74,23 +71,12 @@ def has_training_ended(trial_prefix):
7471

7572

7673
def delete_s3_prefixes(bucket, keys):
77-
s3_handler = S3Handler()
7874
if not isinstance(keys, list):
7975
keys = [keys]
80-
list_prefixes = s3_handler.list_prefixes(
81-
[ListRequest(Bucket=bucket, Prefix=key) for key in keys]
82-
)
83-
prefixes = [item for sublist in list_prefixes for item in sublist]
84-
loop = asyncio.get_event_loop()
85-
86-
async def del_folder(bucket, keys):
87-
loop = asyncio.get_event_loop()
88-
client = aioboto3.client("s3", loop=loop, region_name=get_region())
89-
await asyncio.gather(*[client.delete_object(Bucket=bucket, Key=key) for key in keys])
90-
await client.close()
91-
92-
task = loop.create_task(del_folder(bucket, prefixes))
93-
loop.run_until_complete(task)
76+
delreqs = []
77+
for key in keys:
78+
delreqs.append(DeleteRequest(bucket, key))
79+
S3Handler.delete_prefixes(delreqs)
9480

9581

9682
def check_dir_exists(path):
@@ -99,9 +85,8 @@ def check_dir_exists(path):
9985
s3, bucket_name, key_name = is_s3(path)
10086
if s3:
10187
try:
102-
s3_handler = S3Handler()
10388
request = ListRequest(bucket_name, key_name)
104-
folder = s3_handler.list_prefixes([request])[0]
89+
folder = S3Handler.list_prefixes([request])[0]
10590
if len(folder) > 0 and has_training_ended(folder[-1]):
10691
raise RuntimeError(
10792
"The path:{} already exists on s3. "

smdebug/core/index_reader.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,6 @@ def __init__(self, path):
256256
super().__init__(path)
257257
self.path = path
258258
_, self.bucket_name, self.prefix_name = is_s3(path)
259-
self.s3_handler = S3Handler()
260-
261259
self.index_file_cache = ReadIndexFilesCache()
262260

263261
def _is_event_file_present(self, file):
@@ -274,7 +272,7 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray:
274272
start = tensor_location.start_idx
275273
length = tensor_location.length
276274
request = [ReadObjectRequest(event_file_name, int(start), int(length))]
277-
res = self.s3_handler.get_objects(request)
275+
res = S3Handler.get_objects(request)
278276
tr = TensorReader(res[0]) # Access the only element in res
279277
tensor_tuple = list(tr.read_tensors())[0] # Access the only element in the list
280278
tensor_name, step, tensor_data, mode, mode_step = tensor_tuple
@@ -323,7 +321,7 @@ def read_index_files(
323321
)
324322
self.index_file_cache.add(index_file, start_after_key)
325323

326-
responses = self.s3_handler.get_objects(object_requests)
324+
responses = S3Handler.get_objects(object_requests)
327325
return responses, steps, start_after_key, workers
328326

329327
def list_index_files(self, start_after_key=None):

smdebug/core/s3_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# list_info will be a list of ListRequest objects. Returns list of lists of files for each request
66
def _list_s3_prefixes(list_info):
7-
files = S3Handler().list_prefixes(list_info)
7+
files = S3Handler.list_prefixes(list_info)
88
if len(files) == 1:
99
files = files[0]
1010
return files

smdebug/trials/s3_trial.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def __init__(
4444
self.prefix_name = os.path.join(prefix_name, "")
4545
self.path = "s3://" + os.path.join(self.bucket_name, self.prefix_name)
4646
self.index_reader = S3IndexReader(self.path)
47-
self.s3_handler = S3Handler()
4847
self._load_collections()
4948
self._load_tensors()
5049

@@ -67,7 +66,7 @@ def _read_collections(self, collection_files):
6766
first_collection_file = collection_files[0] # First Collection File
6867
key = os.path.join(first_collection_file)
6968
collections_req = ReadObjectRequest(self._get_s3_location(key))
70-
obj_data = self.s3_handler.get_objects([collections_req])[0]
69+
obj_data = S3Handler.get_objects([collections_req])[0]
7170
obj_data = obj_data.decode("utf-8")
7271
self.collection_manager = CollectionManager.load_from_string(obj_data)
7372
self.num_workers = self.collection_manager.get_num_workers()

tests/analysis/utils.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
# Standard Library
2-
import asyncio
32
import os
43

54
# Third Party
6-
import aioboto3
75
import numpy as np
86

97
# First Party
10-
from smdebug.core.access_layer.s3handler import ListRequest, S3Handler
8+
from smdebug.core.access_layer.s3handler import DeleteRequest, S3Handler
119
from smdebug.core.collection_manager import CollectionManager
1210
from smdebug.core.config_constants import DEFAULT_COLLECTIONS_FILE_NAME
1311
from smdebug.core.writer import FileWriter
@@ -51,18 +49,5 @@ def check_trial(trial_obj, num_steps, num_tensors):
5149
assert v is not None
5250

5351

54-
async def del_prefix_helper(bucket, keys):
55-
loop = asyncio.get_event_loop()
56-
client = aioboto3.client("s3", loop=loop)
57-
await asyncio.gather(*[client.delete_object(Bucket=bucket, Key=key) for key in keys])
58-
await client.close()
59-
60-
6152
def delete_s3_prefix(bucket, prefix):
62-
s3_handler = S3Handler()
63-
list_req = [ListRequest(Bucket=bucket, Prefix=prefix)]
64-
keys = s3_handler.list_prefixes(list_req)[0]
65-
66-
loop = asyncio.get_event_loop()
67-
task = loop.create_task(del_prefix_helper(bucket, keys))
68-
loop.run_until_complete(task)
53+
S3Handler.delete_prefix(delete_request=DeleteRequest(Bucket=bucket, Prefix=prefix))

0 commit comments

Comments
 (0)