Skip to content

Commit c879b3a

Browse files
authored
Fix 8427 (#8498)
* WIP: trial and error for dynamically checking airflow vesrion * ran black, updated dag to be 2.3+ compatible * fix lint, bump cncf providers version * address dan suggestion * update gke operator
1 parent 32ec65c commit c879b3a

File tree

4 files changed

+109
-93
lines changed

4 files changed

+109
-93
lines changed

composer/workflows/constraints.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ apache-airflow-providers-arangodb==2.0.0
7676
apache-airflow-providers-asana==2.0.0
7777
apache-airflow-providers-celery==3.0.0
7878
apache-airflow-providers-cloudant==3.0.0
79-
apache-airflow-providers-cncf-kubernetes==4.1.0
79+
apache-airflow-providers-cncf-kubernetes==4.3.0
8080
apache-airflow-providers-databricks==3.0.0
8181
apache-airflow-providers-datadog==3.0.0
8282
apache-airflow-providers-dbt-cloud==2.0.0

composer/workflows/gke_operator.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
GKEDeleteClusterOperator,
2323
GKEStartPodOperator,
2424
)
25-
2625
from airflow.utils.dates import days_ago
2726

27+
from kubernetes.client import models as k8s_models
28+
2829

2930
with models.DAG(
3031
"example_gcp_gke",
@@ -175,7 +176,7 @@
175176
location=CLUSTER_REGION,
176177
cluster_name=CLUSTER_NAME,
177178
namespace="default",
178-
image="perl",
179+
image="perl:5.34.0",
179180
# Entrypoint of the container, if not specified the Docker container's
180181
# entrypoint is used. The cmds parameter is templated.
181182
cmds=["perl"],
@@ -202,16 +203,17 @@
202203
# Can be a large range of data, and can include characters that are not
203204
# permitted by labels.
204205
annotations={"key1": "value1"},
205-
# Resource specifications for Pod, this will allow you to set both cpu
206-
# and memory limits and requirements.
207-
# Prior to Airflow 1.10.4, resource specifications were
208-
# passed as a Pod Resources Class object,
209-
# If using this example on a version of Airflow prior to 1.10.4,
210-
# import the "pod" package from airflow.contrib.kubernetes and use
211-
# resources = pod.Resources() instead passing a dict
212-
# For more info see:
213-
# https://github.com/apache/airflow/pull/4551
214-
resources={"limit_memory": "250M", "limit_cpu": "100m"},
206+
# Optional resource specifications for Pod, this will allow you to
207+
# set both cpu and memory limits and requirements.
208+
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
209+
# resources were passed as a dictionary. This change was made in
210+
# https://github.com/apache/airflow/pull/27197
211+
# Additionally, "memory" and "cpu" were previously named
212+
# "limit_memory" and "limit_cpu"
213+
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
214+
container_resources=k8s_models.V1ResourceRequirements(
215+
limits={"memory": "250M", "cpu": "100m"},
216+
),
215217
# If true, the content of /airflow/xcom/return.json from container will
216218
# also be pushed to an XCom when the container ends.
217219
do_xcom_push=False,

composer/workflows/kubernetes_pod_operator.py

Lines changed: 93 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
from airflow import models
2121
from airflow.kubernetes.secret import Secret
22-
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
23-
22+
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
23+
KubernetesPodOperator,
24+
)
25+
from kubernetes.client import models as k8s_models
2426

2527
# A Secret is an object that contains a small amount of sensitive data such as
2628
# a password, a token, or a key. Such information might otherwise be put in a
@@ -31,24 +33,25 @@
3133
# [START composer_kubernetespodoperator_secretobject]
3234
secret_env = Secret(
3335
# Expose the secret as environment variable.
34-
deploy_type='env',
36+
deploy_type="env",
3537
# The name of the environment variable, since deploy_type is `env` rather
3638
# than `volume`.
37-
deploy_target='SQL_CONN',
39+
deploy_target="SQL_CONN",
3840
# Name of the Kubernetes Secret
39-
secret='airflow-secrets',
41+
secret="airflow-secrets",
4042
# Key of a secret stored in this Secret object
41-
key='sql_alchemy_conn')
43+
key="sql_alchemy_conn",
44+
)
4245
secret_volume = Secret(
43-
deploy_type='volume',
46+
deploy_type="volume",
4447
# Path where we mount the secret as volume
45-
deploy_target='/var/secrets/google',
48+
deploy_target="/var/secrets/google",
4649
# Name of Kubernetes Secret
47-
secret='service-account',
50+
secret="service-account",
4851
# Key in the form of service account file name
49-
key='service-account.json')
52+
key="service-account.json",
53+
)
5054
# [END composer_kubernetespodoperator_secretobject]
51-
5255
# If you are running Airflow in more than one time zone
5356
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
5457
# for best practices
@@ -58,9 +61,10 @@
5861
# will show the task as failed, as well as contain all of the task logs
5962
# required to debug.
6063
with models.DAG(
61-
dag_id='composer_sample_kubernetes_pod',
62-
schedule_interval=datetime.timedelta(days=1),
63-
start_date=YESTERDAY) as dag:
64+
dag_id="composer_sample_kubernetes_pod",
65+
schedule_interval=datetime.timedelta(days=1),
66+
start_date=YESTERDAY,
67+
) as dag:
6468
# Only name, namespace, image, and task_id are required to create a
6569
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
6670
# to using the config file found at `/home/airflow/composer_kube_config if
@@ -71,12 +75,12 @@
7175
# [START composer_kubernetespodoperator_minconfig]
7276
kubernetes_min_pod = KubernetesPodOperator(
7377
# The ID specified for the task.
74-
task_id='pod-ex-minimum',
78+
task_id="pod-ex-minimum",
7579
# Name of task you want to run, used to generate Pod ID.
76-
name='pod-ex-minimum',
80+
name="pod-ex-minimum",
7781
# Entrypoint of the container, if not specified the Docker container's
7882
# entrypoint is used. The cmds parameter is templated.
79-
cmds=['echo'],
83+
cmds=["echo"],
8084
# The namespace to run within Kubernetes, default namespace is
8185
# `default`. In Composer 1 there is the potential for
8286
# the resource starvation of Airflow workers and scheduler
@@ -85,151 +89,160 @@
8589
# to satisfy the computing requirements. Alternatively, launching pods
8690
# into a custom namespace will stop fighting over resources,
8791
# and using Composer 2 will mean the environment will autoscale.
88-
namespace='default',
92+
namespace="default",
8993
# Docker image specified. Defaults to hub.docker.com, but any fully
9094
# qualified URLs will point to a custom repository. Supports private
9195
# gcr.io images if the Composer Environment is under the same
9296
# project-id as the gcr.io images and the service account that Composer
9397
# uses has permission to access the Google Container Registry
9498
# (the default service account has permission)
95-
image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
99+
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
100+
)
96101
# [END composer_kubernetespodoperator_minconfig]
97102
# [START composer_kubernetespodoperator_templateconfig]
98103
kubenetes_template_ex = KubernetesPodOperator(
99-
task_id='ex-kube-templates',
100-
name='ex-kube-templates',
101-
namespace='default',
102-
image='bash',
104+
task_id="ex-kube-templates",
105+
name="ex-kube-templates",
106+
namespace="default",
107+
image="bash",
103108
# All parameters below are able to be templated with jinja -- cmds,
104109
# arguments, env_vars, and config_file. For more information visit:
105110
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
106-
107111
# Entrypoint of the container, if not specified the Docker container's
108112
# entrypoint is used. The cmds parameter is templated.
109-
cmds=['echo'],
113+
cmds=["echo"],
110114
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
111115
# will echo the execution date. Arguments to the entrypoint. The docker
112116
# image's CMD is used if this is not provided. The arguments parameter
113117
# is templated.
114-
arguments=['{{ ds }}'],
118+
arguments=["{{ ds }}"],
115119
# The var template variable allows you to access variables defined in
116120
# Airflow UI. In this case we are getting the value of my_value and
117121
# setting the environment variable `MY_VALUE`. The pod will fail if
118122
# `my_value` is not set in the Airflow UI.
119-
env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
123+
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
120124
# Sets the config file to a kubernetes config file specified in
121125
# airflow.cfg. If the configuration file does not exist or does
122126
# not provide validcredentials the pod will fail to launch. If not
123127
# specified, config_file defaults to ~/.kube/config
124-
config_file="{{ conf.get('core', 'kube_config') }}")
128+
config_file="{{ conf.get('core', 'kube_config') }}",
129+
)
125130
# [END composer_kubernetespodoperator_templateconfig]
126131
# [START composer_kubernetespodoperator_secretconfig]
127132
kubernetes_secret_vars_ex = KubernetesPodOperator(
128-
task_id='ex-kube-secrets',
129-
name='ex-kube-secrets',
130-
namespace='default',
131-
image='ubuntu',
133+
task_id="ex-kube-secrets",
134+
name="ex-kube-secrets",
135+
namespace="default",
136+
image="ubuntu",
132137
startup_timeout_seconds=300,
133138
# The secrets to pass to Pod, the Pod will fail to create if the
134139
# secrets you specify in a Secret object do not exist in Kubernetes.
135140
secrets=[secret_env, secret_volume],
136141
# env_vars allows you to specify environment variables for your
137142
# container to use. env_vars is templated.
138143
env_vars={
139-
'EXAMPLE_VAR': '/example/value',
140-
'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json '})
144+
"EXAMPLE_VAR": "/example/value",
145+
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
146+
},
147+
)
141148
# [END composer_kubernetespodoperator_secretconfig]
142149
# [START composer_kubernetespodaffinity]
143150
# Pod affinity with the KubernetesPodOperator
144151
# is not supported with Composer 2
145152
# instead, create a cluster and use the GKEStartPodOperator
146153
# https://cloud.google.com/composer/docs/using-gke-operator
147154
kubernetes_affinity_ex = KubernetesPodOperator(
148-
task_id='ex-pod-affinity',
149-
name='ex-pod-affinity',
150-
namespace='default',
151-
image='perl:5.34.0',
152-
cmds=['perl'],
153-
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
155+
task_id="ex-pod-affinity",
156+
name="ex-pod-affinity",
157+
namespace="default",
158+
image="perl:5.34.0",
159+
cmds=["perl"],
160+
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
154161
# affinity allows you to constrain which nodes your pod is eligible to
155162
# be scheduled on, based on labels on the node. In this case, if the
156163
# label 'cloud.google.com/gke-nodepool' with value
157164
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
158165
# nodes, it will fail to schedule.
159166
affinity={
160-
'nodeAffinity': {
167+
"nodeAffinity": {
161168
# requiredDuringSchedulingIgnoredDuringExecution means in order
162169
# for a pod to be scheduled on a node, the node must have the
163170
# specified labels. However, if labels on a node change at
164171
# runtime such that the affinity rules on a pod are no longer
165172
# met, the pod will still continue to run on the node.
166-
'requiredDuringSchedulingIgnoredDuringExecution': {
167-
'nodeSelectorTerms': [{
168-
'matchExpressions': [{
169-
# When nodepools are created in Google Kubernetes
170-
# Engine, the nodes inside of that nodepool are
171-
# automatically assigned the label
172-
# 'cloud.google.com/gke-nodepool' with the value of
173-
# the nodepool's name.
174-
'key': 'cloud.google.com/gke-nodepool',
175-
'operator': 'In',
176-
# The label key's value that pods can be scheduled
177-
# on.
178-
'values': [
179-
'pool-0',
180-
'pool-1',
173+
"requiredDuringSchedulingIgnoredDuringExecution": {
174+
"nodeSelectorTerms": [
175+
{
176+
"matchExpressions": [
177+
{
178+
# When nodepools are created in Google Kubernetes
179+
# Engine, the nodes inside of that nodepool are
180+
# automatically assigned the label
181+
# 'cloud.google.com/gke-nodepool' with the value of
182+
# the nodepool's name.
183+
"key": "cloud.google.com/gke-nodepool",
184+
"operator": "In",
185+
# The label key's value that pods can be scheduled
186+
# on.
187+
"values": [
188+
"pool-0",
189+
"pool-1",
190+
],
191+
}
181192
]
182-
}]
183-
}]
193+
}
194+
]
184195
}
185196
}
186-
})
197+
},
198+
)
187199
# [END composer_kubernetespodaffinity]
188200
# [START composer_kubernetespodoperator_fullconfig]
189201
kubernetes_full_pod = KubernetesPodOperator(
190-
task_id='ex-all-configs',
191-
name='pi',
192-
namespace='default',
193-
image='perl:5.34.0',
202+
task_id="ex-all-configs",
203+
name="pi",
204+
namespace="default",
205+
image="perl:5.34.0",
194206
# Entrypoint of the container, if not specified the Docker container's
195207
# entrypoint is used. The cmds parameter is templated.
196-
cmds=['perl'],
208+
cmds=["perl"],
197209
# Arguments to the entrypoint. The docker image's CMD is used if this
198210
# is not provided. The arguments parameter is templated.
199-
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
211+
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
200212
# The secrets to pass to Pod, the Pod will fail to create if the
201213
# secrets you specify in a Secret object do not exist in Kubernetes.
202214
secrets=[],
203215
# Labels to apply to the Pod.
204-
labels={'pod-label': 'label-name'},
216+
labels={"pod-label": "label-name"},
205217
# Timeout to start up the Pod, default is 120.
206218
startup_timeout_seconds=120,
207219
# The environment variables to be initialized in the container
208220
# env_vars are templated.
209-
env_vars={'EXAMPLE_VAR': '/example/value'},
221+
env_vars={"EXAMPLE_VAR": "/example/value"},
210222
# If true, logs stdout output of container. Defaults to True.
211223
get_logs=True,
212224
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
213225
# the Kubelet to skip pulling an image if it already exists. If you
214226
# want to always pull a new image, set it to 'Always'.
215-
image_pull_policy='Always',
227+
image_pull_policy="Always",
216228
# Annotations are non-identifying metadata you can attach to the Pod.
217229
# Can be a large range of data, and can include characters that are not
218230
# permitted by labels.
219-
annotations={'key1': 'value1'},
231+
annotations={"key1": "value1"},
220232
# Optional resource specifications for Pod, this will allow you to
221233
# set both cpu and memory limits and requirements.
222-
# Prior to Airflow 1.10.4, resource specifications were
223-
# passed as a Pod Resources Class object,
224-
# If using this example on a version of Airflow prior to 1.10.4,
225-
# import the "pod" package from airflow.contrib.kubernetes and use
226-
# resources = pod.Resources() instead passing a dict
227-
# For more info see:
228-
# https://github.com/apache/airflow/pull/4551
229-
resources={'limit_memory': "250M", 'limit_cpu': "100m"},
234+
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
235+
# resources were passed as a dictionary. This change was made in
236+
# https://github.com/apache/airflow/pull/27197
237+
# Additionally, "memory" and "cpu" were previously named
238+
# "limit_memory" and "limit_cpu"
239+
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
240+
container_resources=k8s_models.V1ResourceRequirements(
241+
limits={"memory": "250M", "cpu": "100m"},
242+
),
230243
# Specifies path to kubernetes config. If no config is specified will
231244
# default to '~/.kube/config'. The config_file is templated.
232-
config_file='/home/airflow/composer_kube_config',
245+
config_file="/home/airflow/composer_kube_config",
233246
# If true, the content of /airflow/xcom/return.json from container will
234247
# also be pushed to an XCom when the container ends.
235248
do_xcom_push=False,
@@ -244,6 +257,7 @@
244257
# is not supported with Composer 2
245258
# instead, create a cluster and use the GKEStartPodOperator
246259
# https://cloud.google.com/composer/docs/using-gke-operator
247-
affinity={})
260+
affinity={},
261+
)
248262
# [END composer_kubernetespodoperator_fullconfig]
249263
# [END composer_kubernetespodoperator]

composer/workflows/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# see https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html#constraints-files
33
apache-airflow==2.3.3
44
apache-airflow-providers-apache-beam==4.0.0
5-
apache-airflow-providers-cncf-kubernetes==4.1.0
5+
apache-airflow-providers-cncf-kubernetes==4.3.0
66
apache-airflow-providers-google==8.3.0
77
scipy==1.8.1; python_version > '3.0'
88
apache-airflow-providers-amazon==4.0.0

0 commit comments

Comments
 (0)