20
20
21
21
import os
22
22
import pathlib
23
- import attr
24
23
import logging
24
+ from typing import Dict , List , Optional , Tuple
25
+ import attr
25
26
26
27
from six .moves .urllib .parse import urlparse
27
28
from six .moves .urllib .request import url2pathname
28
- from typing import Dict , List , Optional , Tuple
29
29
30
30
from sagemaker import s3
31
31
from sagemaker .job import _Job
@@ -1331,6 +1331,7 @@ def _pre_init_normalization(
1331
1331
image_uri : Optional [str ] = None ,
1332
1332
base_job_name : Optional [str ] = None ,
1333
1333
) -> Tuple [str , str ]:
1334
+ """Normalize job name and container image uri."""
1334
1335
# Normalize base_job_name
1335
1336
if base_job_name is None :
1336
1337
base_job_name = self .estimator_cls ._framework_name
@@ -1473,7 +1474,7 @@ def run( # type: ignore[override]
1473
1474
desired_s3_uri = f"{ self .s3_prefix } /{ job_name } /source/runproc.sh" ,
1474
1475
sagemaker_session = self .sagemaker_session ,
1475
1476
)
1476
- logger .info ("runproc.sh uploaded to" , s3_runproc_sh )
1477
+ logger .info ("runproc.sh uploaded to %s " , s3_runproc_sh )
1477
1478
1478
1479
# Submit a processing job.
1479
1480
super ().run (
@@ -1496,6 +1497,7 @@ def _upload_payload(
1496
1497
git_config : Optional [Dict [str , str ]],
1497
1498
job_name : str ,
1498
1499
) -> "sagemaker.estimator.Framework" : # type: ignore[name-defined] # noqa: F821
1500
+ """Upload payload sourcedir.tar.gz to S3."""
1499
1501
# A new estimator instance is required, because each call to ScriptProcessor.run() can
1500
1502
# use different codes.
1501
1503
estimator = self .estimator_cls (
@@ -1505,8 +1507,8 @@ def _upload_payload(
1505
1507
git_config = git_config ,
1506
1508
framework_version = self .framework_version ,
1507
1509
py_version = self .py_version ,
1508
- code_location = self .s3_prefix , # Estimator will use <code_location >/jobname/output/source.tar.gz
1509
- enable_network_isolation = False , # If true, estimator uploads to input channel. Not what we want!
1510
+ code_location = self .s3_prefix , # Upload to <code_loc >/jobname/output/source.tar.gz
1511
+ enable_network_isolation = False , # If true, uploads to input channel. Not what we want!
1510
1512
image_uri = self .image_uri , # The image uri is already normalized by this point.
1511
1513
role = self .role ,
1512
1514
instance_type = self .instance_type ,
@@ -1526,6 +1528,10 @@ def _upload_payload(
1526
1528
return estimator
1527
1529
1528
1530
def _patch_inputs_with_payload (self , inputs , s3_payload ) -> List [ProcessingInput ]:
1531
+ """Add payload sourcedir.tar.gz to processing input.
1532
+
1533
+ This method follows the same mechanism in ScriptProcessor.
1534
+ """
1529
1535
# ScriptProcessor job will download only s3://..../code/runproc.sh, hence we need to also
1530
1536
# inject our s3://.../sourcedir.tar.gz.
1531
1537
#
@@ -1534,7 +1540,8 @@ def _patch_inputs_with_payload(self, inputs, s3_payload) -> List[ProcessingInput
1534
1540
# /opt/ml/processing/input/code/payload/. Note that source.dir.tar.gz cannot go to
1535
1541
# /opt/ml/processing/input/code because the ScriptProcessor has first-right-to-use. See:
1536
1542
# - ScriptProcessor._CODE_CONTAINER_BASE_PATH, ScriptProcessor._CODE_CONTAINER_INPUT_NAME.
1537
- # - https://github.com/aws/sagemaker-python-sdk/blob/a7399455f5386d83ddc5cb15c0db00c04bd518ec/src/sagemaker/processing.py#L425-L426)
1543
+ # - https://github.com/aws/sagemaker-python-sdk/blob/ \
1544
+ # a7399455f5386d83ddc5cb15c0db00c04bd518ec/src/sagemaker/processing.py#L425-L426
1538
1545
if inputs is None :
1539
1546
inputs = []
1540
1547
inputs .append (
0 commit comments