Skip to content

Commit 3cfa8d5

Browse files
Ability to save shapes (aws#341)
* WIP saveshape * Add shape writer * Add pytorch test * Add untested keras test * fix syntax * fix syntax * Import * Import * Add tests for TF * Simplify read code * Add read API and tests * Add mxnet test * Add s3 and json tests * lint * Fix payload * fix import * Handle different num tensors for losses * Fix exact equal condition * Fix mode bug * trigger CI * Add support for distributed training with writer map * Check that value throws exception * Fix tests to make them more resilient * Fix mxnet and pytorch tests * Remove tensor names * pre-commmit * Fix get_mode * Fix bug with old index files * Fix keras test with names of tensors * Set original name to None if tf_obj is None * Fix mirrored test for cpu * Add docs * trigger CI * Fix shape writer get * Simplify by removing shape writer * Cleanup * Fix name of writer * Addressed review comments * trigger ci * retrigger CI Co-authored-by: NihalHarish <[email protected]>
1 parent 3e463af commit 3cfa8d5

24 files changed

+637
-209
lines changed

docs/analysis.md

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ This page describes the programming model that SageMaker Debugger provides for y
3030
* [steps](#steps-1)
3131
* [value](#value)
3232
* [reduction_value](#reduction_value)
33-
* [reduction_values](#reduction_values)
33+
* [shape](#shape)
3434
* [values](#values)
35+
* [reduction_values](#reduction_values)
36+
* [shapes](#shapes)
3537
* [workers](#workers-1)
3638
* [prev_steps](#prev_steps)
3739
* [Rules](#Rules)
@@ -356,6 +358,34 @@ trial.tensor(name).reduction_value(step_num, reduction_name,
356358
###### Returns
357359
`numpy.ndarray` The reduction value of tensor at the given step and worker (if the training job saved data from multiple workers) as a 1x1 numpy array. If this reduction was saved for the tensor during training as part of specification through reduction config, it will be loaded and returned. If the given reduction was not saved then, but the full tensor was saved, the reduction will be computed on the fly and returned. If both the chosen reduction and full tensor are not available, this method raises `TensorUnavailableForStep` exception.
358360

361+
#### shape
362+
Get the shape of the chosen tensor at a particular step.
363+
364+
```python
365+
trial.tensor(name).shape(step_num, mode=modes.GLOBAL, worker=None)
366+
367+
```
368+
###### Arguments
369+
- `step_num (int)` The step number whose value is to be returned for the mode passed through the next parameter.
370+
- `mode (smdebug.modes enum value)` The mode applicable for the step number passed above. Defaults to `modes.GLOBAL`
371+
- `worker (str)` This parameter is only applicable for distributed training. You can retrieve the value of the tensor from a specific worker by passing the worker name. You can query all the workers seen by the trial with the `trial.workers()` method. You might also be interested in querying the workers which saved a value for the tensor at a specific step, this is possible with the method: `trial.tensor(name).workers(step, mode)`
372+
373+
###### Returns
374+
`tuple(int)` If only the shape of this tensor was saved through `save_shape` configuration in ReductionConfig, it will be returned. If the full tensor was saved, then shape will be computed and returned today. If both the shape and full tensor are not available, this method raises `TensorUnavailableForStep` exception.
375+
376+
#### values
377+
Get the values of the tensor for all steps of a given mode.
378+
379+
```python
380+
trial.tensor(name).values(mode=modes.GLOBAL, worker=None)
381+
```
382+
383+
###### Arguments
384+
- `mode (smdebug.modes enum value)` The mode applicable for the step number passed above. Defaults to `modes.GLOBAL`
385+
- `worker (str)` This parameter is only applicable for distributed training. You can retrieve the value of the tensor from a specific worker by passing the worker name. You can query all the workers seen by the trial with the `trial.workers()` method. You might also be interested in querying the workers which saved a value for the tensor at a specific step, this is possible with the method: `trial.tensor(name).workers(step, mode)`
386+
387+
###### Returns
388+
`dict[int -> numpy.ndarray]` A dictionary with step numbers as keys and numpy arrays representing the value of the tensor as values.
359389

360390
#### reduction_values
361391
Get all reduction values saved for the chosen tensor at a particular step. A reduction value is a tensor reduced to a single value through reduction or aggregation operations. Please go through the description of the method `reduction_value` for more details.
@@ -372,19 +402,19 @@ trial.tensor(name).reduction_values(step_num, mode=modes.GLOBAL, worker=None)
372402
###### Returns
373403
`dict[(str, bool) -> numpy.ndarray]` A dictionary with keys being tuples of the form `(reduction_name, abs)` to a 1x1 numpy ndarray value. `abs` here is a boolean that denotes whether the reduction was performed on the absolute value of the tensor or not. Note that this method only returns the reductions which were saved from the training job. It does not compute all known reductions and return them if only the raw tensor was saved.
374404

375-
#### values
376-
Get the values of the tensor for all steps of a given mode.
405+
#### shapes
406+
Get the shapes of the tensor for all steps of a given mode.
377407

378408
```python
379-
trial.tensor(name).values(mode=modes.GLOBAL, worker=None)
409+
trial.tensor(name).shapes(mode=modes.GLOBAL, worker=None)
380410
```
381411

382412
###### Arguments
383413
- `mode (smdebug.modes enum value)` The mode applicable for the step number passed above. Defaults to `modes.GLOBAL`
384414
- `worker (str)` This parameter is only applicable for distributed training. You can retrieve the value of the tensor from a specific worker by passing the worker name. You can query all the workers seen by the trial with the `trial.workers()` method. You might also be interested in querying the workers which saved a value for the tensor at a specific step, this is possible with the method: `trial.tensor(name).workers(step, mode)`
385415

386416
###### Returns
387-
`dict[int -> numpy.ndarray]` A dictionary with step numbers as keys and numpy arrays representing the value of the tensor as values.
417+
`dict[int -> tuple(int)]` A dictionary with step numbers as keys and tuples of ints representing the shapes of the tensor as values.
388418

389419
#### workers
390420
Get all the workers for which this tensor was saved at a given step

docs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ include_workers
9696
include_regex
9797
reductions
9898
save_raw_tensor
99+
save_shape
99100
save_interval
100101
save_steps
101102
start_step

smdebug/core/hook.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,17 @@ def _prepare_collections(self):
420420
self.prepared_collections = True
421421

422422
#### End of Save Manager methods ####
423+
@staticmethod
424+
def _close_given_writer_map(writer_dict):
425+
# Delete all the dist training writers
426+
to_delete_writers = []
427+
for key, writer in writer_dict.items():
428+
# close calls flush
429+
writer.close()
430+
to_delete_writers.append(key)
431+
432+
for key in to_delete_writers:
433+
del writer_dict[key]
423434

424435
def _close_writers(self) -> None:
425436
if self.dry_run:
@@ -433,16 +444,7 @@ def _close_writers(self) -> None:
433444
self.writer.close()
434445
self.writer = None
435446

436-
to_delete_writers = []
437-
438-
# Delete all the tb writers
439-
for mode, writer in self.tb_writers.items():
440-
if writer is not None:
441-
writer.flush()
442-
writer.close()
443-
to_delete_writers.append(mode)
444-
for mode in to_delete_writers:
445-
del self.tb_writers[mode]
447+
self._close_given_writer_map(self.tb_writers)
446448

447449
def _initialize_writers(self, only_initialize_if_missing=False) -> None:
448450
# Function is overridden in smdebug/tensorflow/base_hook.py
@@ -470,8 +472,12 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None:
470472
if self.save_all_workers is False:
471473
if self.worker != self.chief_worker:
472474
return
475+
473476
self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker)
474477

478+
def _get_main_writer(self) -> List[FileWriter]:
479+
return [self.writer] if self.writer else []
480+
475481
def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]:
476482
"""
477483
:param tensor_name:
@@ -480,7 +486,7 @@ def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]:
480486
"""
481487
if self.save_all_workers is False and self.worker != self.chief_worker:
482488
return []
483-
return [self.writer] if self.writer else []
489+
return self._get_main_writer()
484490

485491
def _maybe_get_tb_writer(self) -> Optional[FileWriter]:
486492
""" Returns a FileWriter object if `hook.tensorboard_dir` has been specified, else None.
@@ -749,6 +755,31 @@ def _write_raw_tensor(self, tensor_name, tensor_value, save_collections, tensor_
749755
self._write_raw_tensor_simple(tensor_name, tensor_value, tensor_ref=tensor_ref)
750756
break
751757

758+
def _write_shape(self, tensor_name, tensor_value, save_collections, tensor_ref=None):
759+
writers = self._get_writers(tensor_name, tensor_ref=tensor_ref)
760+
for s_col in save_collections:
761+
reduction_config = s_col.reduction_config
762+
if self.dry_run is False and reduction_config.save_shape is True:
763+
numpy_tensor_value = self._make_numpy_array(tensor_value)
764+
this_size, this_shape = size_and_shape(numpy_tensor_value)
765+
# In TF Keras and Variables in all interfaces of TF, sometimes we output tensors with
766+
# more meaningful names than the origina name. Outputting
767+
# both Smdebug given name and original name in such cases
768+
if tensor_ref is not None and tensor_ref.tf_obj is not None:
769+
original_name = tensor_ref.tf_obj.name
770+
else:
771+
original_name = None
772+
773+
for writer in writers:
774+
writer.write_shape(
775+
tensor_name,
776+
this_shape,
777+
self.mode,
778+
self.mode_steps[self.mode],
779+
original_name=original_name,
780+
)
781+
break
782+
752783
def _write_raw_tensor_simple(self, tensor_name, tensor_value, tensor_ref=None, timestamp=None):
753784
# tensor_ref is used by TF
754785
# todo: if fp16, check perf of saving as fp16 in proto vs as fp32
@@ -828,6 +859,9 @@ def _write_for_tensor(self, tensor_name, tensor_value, save_collections, tensor_
828859
:param save_collections: list of collections which are being saved for this step
829860
"""
830861
self._log_save(tensor_name, save_collections)
862+
863+
self._write_shape(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref)
864+
831865
# write reductions defined for collections this tensor may be part of
832866
self._write_reductions(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref)
833867

smdebug/core/index_reader.py

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
MISSING_EVENT_FILE_RETRY_LIMIT,
1717
MISSING_EVENT_FILE_RETRY_LIMIT_KEY,
1818
)
19-
from smdebug.core.locations import IndexFileLocationUtils, TensorLocation
19+
from smdebug.core.locations import IndexFileLocationUtils, TensorLocation, TensorShape
2020
from smdebug.core.logger import get_logger
2121
from smdebug.core.modes import ModeKeys
2222
from smdebug.core.s3_utils import list_s3_objects
@@ -120,12 +120,22 @@ def fetch_tensor_value(self, tensor_location: TensorLocation):
120120
def list_event_files(self, start_after_prefix):
121121
pass
122122

123-
@abstractmethod
124123
def load_tensor_data_from_index_files(
125124
self, start_after_key=None, range_steps=None
126125
) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]:
127126
"""Return a triply nested dict referring to tensor data."""
128127

128+
responses, steps, last_index_token, workers = self.read_index_files(
129+
start_after_key, range_steps
130+
)
131+
132+
tensor_data = {}
133+
for step, response, worker in zip(steps, responses, workers):
134+
tensor_data = self._update_tensors_from_json(
135+
tensor_data, step, response, self.path, worker
136+
)
137+
return tensor_data, last_index_token
138+
129139
@abstractmethod
130140
def _is_event_file_present(self, file_name) -> bool:
131141
pass
@@ -203,8 +213,10 @@ def _validate(index_dict):
203213
raise IndexReaderException("meta section is not present")
204214
if len(index_dict["meta"]) == 0:
205215
raise IndexReaderException("meta section is empty")
206-
if "tensor_payload" not in index_dict:
207-
raise IndexReaderException("tensor_payload section is not present")
216+
if "tensor_payload" not in index_dict and "shape_payload" not in index_dict:
217+
raise IndexReaderException(
218+
"neither tensor_payload nor shape_payload sections are present"
219+
)
208220

209221
def _update_tensors_from_json(
210222
self, index_tensors_dict, step, response: bytes, path, worker
@@ -233,28 +245,41 @@ def _update_tensors_from_json(
233245
mode = index_meta["mode"]
234246
mode = ModeKeys[mode.strip()]
235247
mode_step = index_meta["mode_step"]
236-
event_file_name = os.path.join(path, index_meta["event_file_name"])
237-
tensors = index_dict["tensor_payload"]
238-
for tensor in tensors:
239-
tensor_name = tensor["tensorname"]
240-
start_idx = tensor["start_idx"]
241-
length = tensor["length"]
242-
tensor_location = TensorLocation(
243-
tensor_name, mode, mode_step, event_file_name, start_idx, length, worker
244-
)
248+
249+
to_update_index_dict = []
250+
251+
if "tensor_payload" in index_dict and len(index_dict["tensor_payload"]):
252+
event_file_name = os.path.join(path, index_meta["event_file_name"])
253+
for tensor in index_dict["tensor_payload"]:
254+
tensor_name = tensor["tensorname"]
255+
start_idx = tensor["start_idx"]
256+
length = tensor["length"]
257+
tensor_location = TensorLocation(
258+
tensor_name, mode, mode_step, event_file_name, start_idx, length, worker
259+
)
260+
to_update_index_dict.append((tensor_name, step, tensor_location))
261+
262+
if "shape_payload" in index_dict and len(index_dict["shape_payload"]):
263+
for tensor in index_dict["shape_payload"]:
264+
tensor_name = tensor["tensorname"]
265+
original_name = tensor["originalname"]
266+
shape = tensor["shape"]
267+
ts = TensorShape(tensor_name, mode, mode_step, shape, original_name)
268+
to_update_index_dict.append((tensor_name, step, ts))
269+
270+
for tu in to_update_index_dict:
271+
tensor_name, step, obj = tu
272+
if isinstance(obj, TensorLocation):
273+
obj_dict = {"tensor_location": obj}
274+
elif isinstance(obj, TensorShape):
275+
obj_dict = {"tensor_shape": obj}
245276
if tensor_name in index_tensors_dict:
246277
if step in index_tensors_dict[tensor_name]:
247-
index_tensors_dict[tensor_name][step].update(
248-
{worker: {"tensor_location": tensor_location}}
249-
)
278+
index_tensors_dict[tensor_name][step].update({worker: obj_dict})
250279
else:
251-
index_tensors_dict[tensor_name].update(
252-
{step: {worker: {"tensor_location": tensor_location}}}
253-
)
280+
index_tensors_dict[tensor_name].update({step: {worker: obj_dict}})
254281
else:
255-
index_tensors_dict[tensor_name] = {
256-
step: {worker: {"tensor_location": tensor_location}}
257-
}
282+
index_tensors_dict[tensor_name] = {step: {worker: obj_dict}}
258283
return index_tensors_dict
259284

260285

@@ -285,22 +310,6 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray:
285310
tensor_name, step, tensor_data, mode, mode_step = tensor_tuple
286311
return tensor_data
287312

288-
def load_tensor_data_from_index_files(
289-
self, start_after_key=None, range_steps=None
290-
) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]:
291-
"""Return a triply nested dict referring to tensor data."""
292-
293-
responses, steps, last_index_token, workers = self.read_index_files(
294-
start_after_key, range_steps
295-
)
296-
297-
tensor_data = {}
298-
for step, response, worker in zip(steps, responses, workers):
299-
tensor_data = self._update_tensors_from_json(
300-
tensor_data, step, response, self.path, worker
301-
)
302-
return tensor_data, last_index_token
303-
304313
def read_index_files(
305314
self, start_after_key: str, range_steps=None
306315
) -> Tuple[List[bytes], list, str, List[str]]:
@@ -398,21 +407,6 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray:
398407
tensor_name, step, tensor_data, mode, mode_step = tensor_tuple
399408
return tensor_data
400409

401-
def load_tensor_data_from_index_files(
402-
self, start_after_key=None, range_steps=None
403-
) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]:
404-
"""Return a triply nested dict referring to tensor data."""
405-
406-
responses, steps, last_index_token, workers = self.read_index_files(
407-
start_after_key, range_steps
408-
)
409-
tensor_data = {}
410-
for step, response, worker in zip(steps, responses, workers):
411-
tensor_data = self._update_tensors_from_json(
412-
tensor_data, step, response, self.path, worker
413-
)
414-
return tensor_data, last_index_token
415-
416410
def read_index_files(
417411
self, start_after_key: str, range_steps=None
418412
) -> Tuple[List[bytes], list, str, List[str]]:

smdebug/core/locations.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ def to_dict(self):
2424
return {"tensorname": self.tensorname, "start_idx": self.start_idx, "length": self.length}
2525

2626

27+
class TensorShape:
28+
def __init__(self, name, mode, mode_step, shape, original_name=None):
29+
self.name = name
30+
self.original_name = original_name if original_name is not None else name
31+
self.mode = mode
32+
self.mode_step = mode_step
33+
self.shape = tuple(shape)
34+
35+
def to_dict(self):
36+
return {"tensorname": self.name, "originalname": self.original_name, "shape": self.shape}
37+
38+
2739
STEP_NUMBER_FORMATTING_LENGTH = "012"
2840

2941

0 commit comments

Comments
 (0)