@@ -184,9 +184,7 @@ def run(
184
184
if wait :
185
185
self .latest_job .wait (logs = logs )
186
186
187
- def _extend_processing_args (
188
- self , inputs , outputs , ** kwargs
189
- ): # pylint: disable=W0613
187
+ def _extend_processing_args (self , inputs , outputs , ** kwargs ): # pylint: disable=W0613
190
188
"""Extend inputs and outputs based on extra parameters"""
191
189
return inputs , outputs
192
190
@@ -288,22 +286,15 @@ def _normalize_inputs(self, inputs=None, kms_key=None):
288
286
# Iterate through the provided list of inputs.
289
287
for count , file_input in enumerate (inputs , 1 ):
290
288
if not isinstance (file_input , ProcessingInput ):
291
- raise TypeError (
292
- "Your inputs must be provided as ProcessingInput objects."
293
- )
289
+ raise TypeError ("Your inputs must be provided as ProcessingInput objects." )
294
290
# Generate a name for the ProcessingInput if it doesn't have one.
295
291
if file_input .input_name is None :
296
292
file_input .input_name = "input-{}" .format (count )
297
293
298
- if (
299
- isinstance (file_input .source , Properties )
300
- or file_input .dataset_definition
301
- ):
294
+ if isinstance (file_input .source , Properties ) or file_input .dataset_definition :
302
295
normalized_inputs .append (file_input )
303
296
continue
304
- if isinstance (
305
- file_input .s3_input .s3_uri , (Parameter , Expression , Properties )
306
- ):
297
+ if isinstance (file_input .s3_input .s3_uri , (Parameter , Expression , Properties )):
307
298
normalized_inputs .append (file_input )
308
299
continue
309
300
# If the source is a local path, upload it to S3
@@ -349,9 +340,7 @@ def _normalize_outputs(self, outputs=None):
349
340
# Iterate through the provided list of outputs.
350
341
for count , output in enumerate (outputs , 1 ):
351
342
if not isinstance (output , ProcessingOutput ):
352
- raise TypeError (
353
- "Your outputs must be provided as ProcessingOutput objects."
354
- )
343
+ raise TypeError ("Your outputs must be provided as ProcessingOutput objects." )
355
344
# Generate a name for the ProcessingOutput if it doesn't have one.
356
345
if output .output_name is None :
357
346
output .output_name = "output-{}" .format (count )
@@ -563,9 +552,7 @@ def _include_code_in_inputs(self, inputs, code, kms_key=None):
563
552
user_code_s3_uri = self ._handle_user_code_url (code , kms_key )
564
553
user_script_name = self ._get_user_code_name (code )
565
554
566
- inputs_with_code = self ._convert_code_and_add_to_inputs (
567
- inputs , user_code_s3_uri
568
- )
555
+ inputs_with_code = self ._convert_code_and_add_to_inputs (inputs , user_code_s3_uri )
569
556
570
557
self ._set_entrypoint (self .command , user_script_name )
571
558
return inputs_with_code
@@ -700,9 +687,7 @@ def _set_entrypoint(self, command, user_script_name):
700
687
class ProcessingJob (_Job ):
701
688
"""Provides functionality to start, describe, and stop processing jobs."""
702
689
703
- def __init__ (
704
- self , sagemaker_session , job_name , inputs , outputs , output_kms_key = None
705
- ):
690
+ def __init__ (self , sagemaker_session , job_name , inputs , outputs , output_kms_key = None ):
706
691
"""Initializes a Processing job.
707
692
708
693
Args:
@@ -720,9 +705,7 @@ def __init__(
720
705
self .inputs = inputs
721
706
self .outputs = outputs
722
707
self .output_kms_key = output_kms_key
723
- super (ProcessingJob , self ).__init__ (
724
- sagemaker_session = sagemaker_session , job_name = job_name
725
- )
708
+ super (ProcessingJob , self ).__init__ (sagemaker_session = sagemaker_session , job_name = job_name )
726
709
727
710
@classmethod
728
711
def start_new (cls , processor , inputs , outputs , experiment_config ):
@@ -743,9 +726,7 @@ def start_new(cls, processor, inputs, outputs, experiment_config):
743
726
:class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created
744
727
using the ``Processor``.
745
728
"""
746
- process_args = cls ._get_process_args (
747
- processor , inputs , outputs , experiment_config
748
- )
729
+ process_args = cls ._get_process_args (processor , inputs , outputs , experiment_config )
749
730
750
731
# Print the job name and the user's inputs and outputs as lists of dictionaries.
751
732
print ()
@@ -819,26 +800,18 @@ def _get_process_args(cls, processor, inputs, outputs, experiment_config):
819
800
820
801
process_request_args ["app_specification" ] = {"ImageUri" : processor .image_uri }
821
802
if processor .arguments is not None :
822
- process_request_args ["app_specification" ][
823
- "ContainerArguments"
824
- ] = processor .arguments
803
+ process_request_args ["app_specification" ]["ContainerArguments" ] = processor .arguments
825
804
if processor .entrypoint is not None :
826
- process_request_args ["app_specification" ][
827
- "ContainerEntrypoint"
828
- ] = processor .entrypoint
805
+ process_request_args ["app_specification" ]["ContainerEntrypoint" ] = processor .entrypoint
829
806
830
807
process_request_args ["environment" ] = processor .env
831
808
832
809
if processor .network_config is not None :
833
- process_request_args [
834
- "network_config"
835
- ] = processor .network_config ._to_request_dict ()
810
+ process_request_args ["network_config" ] = processor .network_config ._to_request_dict ()
836
811
else :
837
812
process_request_args ["network_config" ] = None
838
813
839
- process_request_args ["role_arn" ] = processor .sagemaker_session .expand_role (
840
- processor .role
841
- )
814
+ process_request_args ["role_arn" ] = processor .sagemaker_session .expand_role (processor .role )
842
815
843
816
process_request_args ["tags" ] = processor .tags
844
817
@@ -859,9 +832,7 @@ def from_processing_name(cls, sagemaker_session, processing_job_name):
859
832
:class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created
860
833
from the job name.
861
834
"""
862
- job_desc = sagemaker_session .describe_processing_job (
863
- job_name = processing_job_name
864
- )
835
+ job_desc = sagemaker_session .describe_processing_job (job_name = processing_job_name )
865
836
866
837
inputs = None
867
838
if job_desc .get ("ProcessingInputs" ):
@@ -878,9 +849,9 @@ def from_processing_name(cls, sagemaker_session, processing_job_name):
878
849
]
879
850
880
851
outputs = None
881
- if job_desc .get ("ProcessingOutputConfig" ) and job_desc [
882
- "ProcessingOutputConfig "
883
- ]. get ( "Outputs" ):
852
+ if job_desc .get ("ProcessingOutputConfig" ) and job_desc ["ProcessingOutputConfig" ]. get (
853
+ "Outputs "
854
+ ):
884
855
outputs = []
885
856
for processing_output_dict in job_desc ["ProcessingOutputConfig" ]["Outputs" ]:
886
857
processing_output = ProcessingOutput (
@@ -892,12 +863,8 @@ def from_processing_name(cls, sagemaker_session, processing_job_name):
892
863
)
893
864
894
865
if "S3Output" in processing_output_dict :
895
- processing_output .source = processing_output_dict ["S3Output" ][
896
- "LocalPath"
897
- ]
898
- processing_output .destination = processing_output_dict ["S3Output" ][
899
- "S3Uri"
900
- ]
866
+ processing_output .source = processing_output_dict ["S3Output" ]["LocalPath" ]
867
+ processing_output .destination = processing_output_dict ["S3Output" ]["S3Uri" ]
901
868
902
869
outputs .append (processing_output )
903
870
output_kms_key = None
@@ -1122,9 +1089,7 @@ def _to_request_dict(self):
1122
1089
self .s3_input .s3_compression_type == "Gzip"
1123
1090
and self .s3_input .s3_input_mode != "Pipe"
1124
1091
):
1125
- raise ValueError (
1126
- "Data can only be gzipped when the input mode is Pipe."
1127
- )
1092
+ raise ValueError ("Data can only be gzipped when the input mode is Pipe." )
1128
1093
1129
1094
s3_input_request ["S3Input" ] = S3Input .to_boto (self .s3_input )
1130
1095
0 commit comments