@@ -10,157 +10,4 @@ is a platform that enables you to programmatically author, schedule, and monitor
10
10
you can build a workflow for SageMaker training, hyperparameter tuning, batch transform and endpoint deployment.
11
11
You can use any SageMaker deep learning framework or Amazon algorithms to perform above operations in Airflow.
12
12
13
- There are two ways to build a SageMaker workflow. Using Airflow SageMaker operators or using Airflow PythonOperator.
14
-
15
- 1. SageMaker Operators: In Airflow 1.10.1, the SageMaker team contributed special operators for SageMaker operations.
16
- Each operator takes a configuration dictionary that defines the corresponding operation. We provide APIs to generate
17
- the configuration dictionary in the SageMaker Python SDK. Currently, the following SageMaker operators are supported:
18
-
19
- * ``SageMakerTrainingOperator ``
20
- * ``SageMakerTuningOperator ``
21
- * ``SageMakerModelOperator ``
22
- * ``SageMakerTransformOperator ``
23
- * ``SageMakerEndpointConfigOperator ``
24
- * ``SageMakerEndpointOperator ``
25
-
26
- 2. PythonOperator: Airflow built-in operator that executes Python callables. You can use the PythonOperator to execute
27
- operations in the SageMaker Python SDK to create a SageMaker workflow.
28
-
29
- Using Airflow on AWS
30
- ~~~~~~~~~~~~~~~~~~~~
31
-
32
- Turbine is an open-source AWS CloudFormation template that enables you to create an Airflow resource stack on AWS.
33
- You can get it here: https://github.com/villasv/aws-airflow-stack
34
-
35
- Using Airflow SageMaker Operators
36
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
37
-
38
- Starting with Airflow 1.10.1, you can use SageMaker operators in Airflow. All SageMaker operators take a configuration
39
- dictionary that can be generated by the SageMaker Python SDK. For example:
40
-
41
- .. code :: python
42
-
43
- import sagemaker
44
- from sagemaker.tensorflow import TensorFlow
45
- from sagemaker.workflow.airflow import training_config, transform_config_from_estimator
46
-
47
- estimator = TensorFlow(entry_point = ' tf_train.py' ,
48
- role = ' sagemaker-role' ,
49
- framework_version = ' 1.11.0' ,
50
- training_steps = 1000 ,
51
- evaluation_steps = 100 ,
52
- train_instance_count = 2 ,
53
- train_instance_type = ' ml.p2.xlarge' )
54
-
55
- # train_config specifies SageMaker training configuration
56
- train_config = training_config(estimator = estimator,
57
- inputs = your_training_data_s3_uri)
58
-
59
- # trans_config specifies SageMaker batch transform configuration
60
- # task_id specifies which operator the training job associatd with; task_type specifies whether the operator is a
61
- # training operator or tuning operator
62
- trans_config = transform_config_from_estimator(estimator = estimator,
63
- task_id = ' tf_training' ,
64
- task_type = ' training' ,
65
- instance_count = 1 ,
66
- instance_type = ' ml.m4.xlarge' ,
67
- data = your_transform_data_s3_uri,
68
- content_type = ' text/csv' )
69
-
70
- Now you can pass these configurations to the corresponding SageMaker operators and create the workflow:
71
-
72
- .. code :: python
73
-
74
- import airflow
75
- from airflow import DAG
76
- from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
77
- from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator
78
-
79
- default_args = {
80
- ' owner' : ' airflow' ,
81
- ' start_date' : airflow.utils.dates.days_ago(2 ),
82
- ' provide_context' : True
83
- }
84
-
85
- dag = DAG(' tensorflow_example' , default_args = default_args,
86
- schedule_interval = ' @once' )
87
-
88
- train_op = SageMakerTrainingOperator(
89
- task_id = ' tf_training' ,
90
- config = train_config,
91
- wait_for_completion = True ,
92
- dag = dag)
93
-
94
- transform_op = SageMakerTransformOperator(
95
- task_id = ' tf_transform' ,
96
- config = trans_config,
97
- wait_for_completion = True ,
98
- dag = dag)
99
-
100
- transform_op.set_upstream(train_op)
101
-
102
- Using Airflow Python Operator
103
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
104
-
105
- `Airflow PythonOperator <https://airflow.apache.org/howto/operator.html?#pythonoperator >`_
106
- is a built-in operator that can execute any Python callable. If you want to build the SageMaker workflow in a more
107
- flexible way, write your python callables for SageMaker operations by using the SageMaker Python SDK.
108
-
109
- .. code :: python
110
-
111
- from sagemaker.tensorflow import TensorFlow
112
-
113
- # callable for SageMaker training in TensorFlow
114
- def train (data , ** context ):
115
- estimator = TensorFlow(entry_point = ' tf_train.py' ,
116
- role = ' sagemaker-role' ,
117
- framework_version = ' 1.11.0' ,
118
- training_steps = 1000 ,
119
- evaluation_steps = 100 ,
120
- train_instance_count = 2 ,
121
- train_instance_type = ' ml.p2.xlarge' )
122
- estimator.fit(data)
123
- return estimator.latest_training_job.job_name
124
-
125
- # callable for SageMaker batch transform
126
- def transform (data , ** context ):
127
- training_job = context[' ti' ].xcom_pull(task_ids = ' training' )
128
- estimator = TensorFlow.attach(training_job)
129
- transformer = estimator.transformer(instance_count = 1 , instance_type = ' ml.c4.xlarge' )
130
- transformer.transform(data, content_type = ' text/csv' )
131
-
132
- Then build your workflow by using the PythonOperator with the Python callables defined above:
133
-
134
- .. code :: python
135
-
136
- import airflow
137
- from airflow import DAG
138
- from airflow.operators.python_operator import PythonOperator
139
-
140
- default_args = {
141
- ' owner' : ' airflow' ,
142
- ' start_date' : airflow.utils.dates.days_ago(2 ),
143
- ' provide_context' : True
144
- }
145
-
146
- dag = DAG(' tensorflow_example' , default_args = default_args,
147
- schedule_interval = ' @once' )
148
-
149
- train_op = PythonOperator(
150
- task_id = ' training' ,
151
- python_callable = train,
152
- op_args = [training_data_s3_uri],
153
- provide_context = True ,
154
- dag = dag)
155
-
156
- transform_op = PythonOperator(
157
- task_id = ' transform' ,
158
- python_callable = transform,
159
- op_args = [transform_data_s3_uri],
160
- provide_context = True ,
161
- dag = dag)
162
-
163
- transform_op.set_upstream(train_op)
164
-
165
- A workflow that runs a SageMaker training job and a batch transform job is finished. You can customize your Python
166
- callables with the SageMaker Python SDK according to your needs, and build more flexible and powerful workflows.
13
+ For information about using SageMaker Workflow, see https://sagemaker.readthedocs.io/en/stable/using_workflow.html.
0 commit comments