Skip to content

Commit 3860a15

Browse files
kaiyang-codeengelke
authored andcommitted
Kaiyang expansion project 2022 (#8224)
* chenged the dag to load ghcn dataset * data preprocessing done * modified preprocessing * dataproc file added * code runs great * modifyed code based on Brad, still buggy * finished modifying, haven't sync wit hDAG * finished modifying DAG codes * ready for draft PR * pass lint * addressed Brad and Leah's comments * pass nox lint * pass nox lint * Fix: Retry CLI launch if needed (#8221) * Fix: add region tags * Fix: region tag typos * Fix: urlpatterns moved to end * Fix: typo * Fix: cli retries to fix flakiness * Fix: remove duplicate tags * Fix: use backoff for retries * Fix: lint import order error * address Leah's comments about typo and comments Co-authored-by: Charles Engelke <[email protected]>
1 parent d4d26d4 commit 3860a15

File tree

3 files changed

+420
-0
lines changed

3 files changed

+420
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This DAG script is an expansion of data_analytics_dag.py that runs a more complex Dataproc job found in data_analytics_process_expansion.py
16+
17+
import datetime
18+
19+
from airflow import models
20+
from airflow.providers.google.cloud.operators import dataproc
21+
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
22+
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
23+
GCSToBigQueryOperator,
24+
)
25+
from airflow.utils.task_group import TaskGroup
26+
27+
PROJECT_NAME = "{{var.value.gcp_project}}"
28+
29+
# BigQuery configs
30+
BQ_DESTINATION_DATASET_NAME = "expansion_project"
31+
BQ_DESTINATION_TABLE_NAME = "ghcnd_stations_joined"
32+
BQ_NORMALIZED_TABLE_NAME = "ghcnd_stations_normalized"
33+
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
34+
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
35+
BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp"
36+
BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow"
37+
38+
# Dataproc configs
39+
BUCKET_NAME = "{{var.value.gcs_bucket}}"
40+
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
41+
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process_expansion.py"
42+
43+
BATCH_ID = "data-processing-{{ ts_nodash | lower}}" # Dataproc serverless only allows lowercase characters
44+
BATCH_CONFIG = {
45+
"pyspark_batch": {
46+
"jar_file_uris": [PYSPARK_JAR],
47+
"main_python_file_uri": PROCESSING_PYTHON_FILE,
48+
"args": [
49+
BUCKET_NAME,
50+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
51+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
52+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PRCP_MEAN_TABLE_NAME}",
53+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_SNOW_MEAN_TABLE_NAME}",
54+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}",
55+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}",
56+
],
57+
},
58+
"environment_config": {
59+
"execution_config": {
60+
"service_account": "{{var.value.dataproc_service_account}}"
61+
}
62+
}
63+
}
64+
65+
yesterday = datetime.datetime.combine(
66+
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
67+
)
68+
69+
default_dag_args = {
70+
# Setting start date as yesterday starts the DAG immediately when it is
71+
# detected in the Cloud Storage bucket.
72+
"start_date": yesterday,
73+
# To email on failure or retry set 'email' arg to your email and enable
74+
# emailing here.
75+
"email_on_failure": False,
76+
"email_on_retry": False
77+
}
78+
79+
with models.DAG(
80+
"data_analytics_dag",
81+
# Continue to run DAG once per day
82+
schedule_interval=datetime.timedelta(days=1),
83+
default_args=default_dag_args,
84+
) as dag:
85+
86+
create_batch = dataproc.DataprocCreateBatchOperator(
87+
task_id="create_batch",
88+
project_id=PROJECT_NAME,
89+
region="{{ var.value.gce_region }}",
90+
batch=BATCH_CONFIG,
91+
batch_id=BATCH_ID,
92+
)
93+
94+
load_external_dataset = GCSToBigQueryOperator(
95+
task_id="run_bq_external_ingestion",
96+
bucket=BUCKET_NAME,
97+
source_objects=["ghcnd-stations-new.txt"],
98+
destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.ghcnd-stations-new",
99+
source_format="CSV",
100+
schema_fields=[
101+
{'name': 'ID', 'type': 'STRING', 'mode': 'REQUIRED'},
102+
{'name': 'LATITUDE', 'type': 'FLOAT', 'mode': 'REQUIRED'},
103+
{'name': 'LONGITUDE', 'type': 'FLOAT', 'mode': 'REQUIRED'},
104+
{'name': 'ELEVATION', 'type': 'FLOAT', 'mode': 'REQUIRED'},
105+
{'name': 'STATE', 'type': 'STRING', 'mode': 'REQUIRED'},
106+
],
107+
write_disposition="WRITE_TRUNCATE"
108+
)
109+
110+
with TaskGroup("join_bq_datasets") as bq_join_group:
111+
112+
for year in range(1997, 2022):
113+
# BigQuery configs
114+
BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
115+
# Specifically query a Chicago weather station
116+
GHCND_STATIONS_JOIN_QUERY = f"""
117+
SELECT Stations.ID, Stations.LATITUDE, Stations.LONGITUDE,
118+
Stations.STATE, Table.DATE, Table.ELEMENT, Table.VALUE
119+
FROM `{PROJECT_NAME}.expansion_project.ghcnd-stations-new` AS Stations, {BQ_DATASET_NAME} AS Table
120+
WHERE Stations.ID = Table.id
121+
"""
122+
123+
bq_join_stations_data = BigQueryInsertJobOperator(
124+
task_id=f"bq_join_stations_data_{str(year)}",
125+
configuration={
126+
"query": {
127+
"query": GHCND_STATIONS_JOIN_QUERY,
128+
"useLegacySql": False,
129+
"destinationTable": {
130+
"projectId": PROJECT_NAME,
131+
"datasetId": BQ_DESTINATION_DATASET_NAME,
132+
"tableId": BQ_DESTINATION_TABLE_NAME,
133+
},
134+
"writeDisposition": "WRITE_APPEND",
135+
}
136+
},
137+
location="US",
138+
)
139+
140+
load_external_dataset >> bq_join_group >> create_batch
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import internal_unit_testing
16+
17+
18+
def test_dag_import():
19+
"""Test that the DAG file can be successfully imported.
20+
21+
This tests that the DAG can be parsed, but does not run it in an Airflow
22+
environment. This is a recommended confidence check by the official Airflow
23+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
24+
"""
25+
26+
import data_analytics_dag_expansion as module
27+
28+
internal_unit_testing.assert_has_valid_dag(module)

0 commit comments

Comments
 (0)