Skip to content

Commit 5310f9e

Browse files
leahecolekaiyang-codeengelkem-strzelczykdandhlee
authored
data analytics tutorial expansion project (#8290)
* Kaiyang expansion project 2022 (#8224) * chenged the dag to load ghcn dataset * data preprocessing done * modified preprocessing * dataproc file added * code runs great * modifyed code based on Brad, still buggy * finished modifying, haven't sync wit hDAG * finished modifying DAG codes * ready for draft PR * pass lint * addressed Brad and Leah's comments * pass nox lint * pass nox lint * Fix: Retry CLI launch if needed (#8221) * Fix: add region tags * Fix: region tag typos * Fix: urlpatterns moved to end * Fix: typo * Fix: cli retries to fix flakiness * Fix: remove duplicate tags * Fix: use backoff for retries * Fix: lint import order error * address Leah's comments about typo and comments Co-authored-by: Charles Engelke <[email protected]> * run blacken on dag and dataproc code * WIP: not working test for process job * working test for expansion dataproc script * move dataproc expansion files to separate directory * add readme * update readme * run black * ignore data file * fix import order * ignore one line of lint because it's being silly * add check for Notfound for test * add requirements files * add noxfile config * update try/except * experiment - fully qualify path * update filepath * update path * try different path * remove the directory that was causing test problems * fix typo in header checker * tell folks to skip cleanup of prereq * clean up hyperlinks for distance weighting and arithmetic mean * fix math links again * remove debug statements * remove commented out variables * Update composer/2022_airflow_summit/data_analytics_dag_expansion_test.py Co-authored-by: Dan Lee <[email protected]> * Apply suggestions from code review Co-authored-by: Dan Lee <[email protected]> * Apply suggestions from code review * update apache-beam version (#8302) Bumping the `apache-beam[gcp]` version to (indirectly) bump the `google-cloud-pubsub` version to accept the keyword argument `request` on `create_topic()` * dataflow: replace job name underscores with hyphens (#8303) * dataflow: replace job name underscores with hyphens It looks like Dataflow no longer accepts underscores in the job names. Replacing them with hyphens should work. * fix test checks * improve error reporting * fix test name for exception handling * chore(deps): update dependency datalab to v1.2.1 (#8309) * fix: unsanitized output (#8316) * fix: unsanitized output * fix: add license to template * chore(deps): update dependency cryptography to v38 (#8317) * chore(deps): update dependency cryptography to v38 * lint Co-authored-by: Anthonios Partheniou <[email protected]> * Remove region tags to be consistent with other languages (#8322) * fix lint in conftest (#8324) * Pin perl version to 5.34.0 as latest doesn't work with the example. (#8319) Co-authored-by: Leah E. Cole <[email protected]> * refactor fixtures * revert last change * revert last change * chore(deps): update dependency tensorflow to v2.7.2 [security] (#8329) * remove backoff, add manual retry (#8328) * remove backoff, add manual retry * fix lint * remove unused import Co-authored-by: Anthonios Partheniou <[email protected]> * refactor test to match #8328 * update most write methods, fix test issue with comparing to exception * Bmiro kaiyang edit (#8350) * modified code to more closely adhere to Spark best practices * remove unnecessary import * improved explanation of Inverse Distance Weighting * Apply suggestions from code review Co-authored-by: Leah E. Cole <[email protected]> Co-authored-by: Leah E. Cole <[email protected]> * run black on process files * fix relative import issue * fixed jvm error (#8360) * Add UDF type hinting (#8361) * fixed jvm error * add type hinting to UDF * Update composer/2022_airflow_summit/data_analytics_process_expansion.py * fix comment alignment * change dataproc region to northamerica-northeast1 * refactor import * switch other test to also use northamerica-northeast1 Co-authored-by: kaiyang-code <[email protected]> Co-authored-by: Charles Engelke <[email protected]> Co-authored-by: Maciej Strzelczyk <[email protected]> Co-authored-by: Dan Lee <[email protected]> Co-authored-by: David Cavazos <[email protected]> Co-authored-by: WhiteSource Renovate <[email protected]> Co-authored-by: Anthonios Partheniou <[email protected]> Co-authored-by: Averi Kitsch <[email protected]> Co-authored-by: mhenc <[email protected]> Co-authored-by: Brad Miro <[email protected]>
1 parent a18c139 commit 5310f9e

13 files changed

+249882
-10
lines changed

.github/header-checker-lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ ignoreFiles:
1515
- "**/constraints.txt"
1616
- "**/constraints-test.txt"
1717
- "**/apt.txt"
18-
- "**/ghcn-stations.txt"
18+
- "**/ghcnd-stations.txt"
1919

2020

2121
sourceFileExtensions:
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Dataproc extension for the Data Analytics Example
2+
3+
## Data in this directory
4+
* [`ghcnd-stations.txt`](./ghcnd-stations.txt) is a freely available dataset about weather stations used in [US government climate data](https://www.ncei.noaa.gov/access/metadata/landing-page/bin/iso?id=gov.noaa.ncdc:C00861). A direct download link can be found at that linked site.
5+
* [`ghcn-stations-processed.csv`](./ghcn-stations-processed.csv) is generated from the `ghcnd-stations.txt` text file. To generate this file yourself, run `python data_processing_helper.py` from this directory
6+
7+
8+
## Prerequisites
9+
Go through the tutorial to [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) skipping the cleanup steps.
10+
11+
## About this example
12+
13+
This directory has a DAG similar to the data analytics DAG found in the [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) tutorial, but includes a more complicated data processing step with Dataproc. Instead of answering the question, "How warm was it in Chicago on Thanksgiving for the past 25 years?" you will answer the question, "How have the rainfall patterns changed over the past 25 years in the western part of the US and in Phoenix, AZ?" For this example, the western part of the US is defined as the [census defined West region](https://www2.census.gov/geo/pdfs/maps-data/maps/reference/us_regdiv.pdf). Phoenix is used in this example because it is a city that has been affected by climate change in recent years, especially with respect to water.
14+
15+
The Dataproc Serverless job uses [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) to calculate precipitation and snowfall in the western states, and uses [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) to focus on the Phoenix specific area.
16+
17+
18+
The DAG has three steps:
19+
20+
1. Ingest the data about the weather stations from Cloud Storage into BigQuery
21+
2. Use BigQuery to join the weather station data with the data used in the prior tutorial - the [GHCN data](https://console.cloud.google.com/marketplace/details/noaa-public/ghcn-d?_ga=2.256175883.1820196808.1661536029-806997694.1661364277) and write the results to a table
22+
3. Run a Dataproc Serverless job that processes the data by
23+
1. Removing any data points that are not from weather stations located in the Western US
24+
2. Removing any data points that are not about snow or other precipitation (data where `ELEMENT` is not `SNOW` or `PRCP`)
25+
3. Convert the values in the `ELEMENT` column (now equal to `SNOW` or `PRCP`) to be in mm, instead of tenths of a mm.
26+
4. Extract the year from the date so the `Date` column is left only with the year
27+
5. Calculate the [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) of precipitation and of snowfall
28+
6. Calculate the [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) for Phoenix.
29+
7. Write the results to tables in BigQuery
30+
31+
## Running this sample
32+
* Add `data_analytics_dag_expansion.py` to the Composer environment you used in the previous tutorial
33+
* Add `data_analytics_process_expansion.py` and `ghcn-stations-processed.csv` to the Cloud Storage bucket you created in the previous tutorial
34+
* Create an empty BigQuery dataset called `precipitation_changes`
35+
36+
You do not need to add any additional Airflow variables, add any additional permissions, or create any other resources.

composer/2022_airflow_summit/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ Running a number of tasks in parallel showcases autoscaling in a Cloud Composer
1111

1212
## data_analytics_dag
1313

14-
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless
14+
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless. More detailed documentation can be found for this DAG [in the Composer documentation](https://cloud.google.com/composer/docs/data-analytics-googlecloud)
15+
16+
## data_analytics_dag_expansion
17+
18+
This DAG is nearly identical to `data_analytics_dag` only it features a more complex Dataproc job. For more info, refer to the [README](./DATAPROC_EXPANSION_README.md)
1519

1620
## retries
1721

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This DAG script is an expansion of data_analytics_dag.py that runs a more complex Dataproc job found in data_analytics_process_expansion.py
16+
17+
import datetime
18+
19+
from airflow import models
20+
from airflow.providers.google.cloud.operators import dataproc
21+
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
22+
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
23+
GCSToBigQueryOperator,
24+
)
25+
from airflow.utils.task_group import TaskGroup
26+
27+
PROJECT_NAME = "{{var.value.gcp_project}}"
28+
29+
# BigQuery configs
30+
BQ_DESTINATION_DATASET_NAME = "precipitation_changes"
31+
BQ_DESTINATION_TABLE_NAME = "ghcnd_stations_joined"
32+
BQ_NORMALIZED_TABLE_NAME = "ghcnd_stations_normalized"
33+
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
34+
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
35+
BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp"
36+
BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow"
37+
38+
# Dataproc configs
39+
BUCKET_NAME = "{{var.value.gcs_bucket}}"
40+
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
41+
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process_expansion.py"
42+
43+
BATCH_ID = "data-processing-{{ ts_nodash | lower}}" # Dataproc serverless only allows lowercase characters
44+
BATCH_CONFIG = {
45+
"pyspark_batch": {
46+
"jar_file_uris": [PYSPARK_JAR],
47+
"main_python_file_uri": PROCESSING_PYTHON_FILE,
48+
"args": [
49+
BUCKET_NAME,
50+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
51+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
52+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PRCP_MEAN_TABLE_NAME}",
53+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_SNOW_MEAN_TABLE_NAME}",
54+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}",
55+
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}",
56+
],
57+
},
58+
"environment_config": {
59+
"execution_config": {
60+
"service_account": "{{var.value.dataproc_service_account}}"
61+
}
62+
},
63+
}
64+
65+
yesterday = datetime.datetime.combine(
66+
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
67+
)
68+
69+
default_dag_args = {
70+
# Setting start date as yesterday starts the DAG immediately when it is
71+
# detected in the Cloud Storage bucket.
72+
"start_date": yesterday,
73+
# To email on failure or retry set 'email' arg to your email and enable
74+
# emailing here.
75+
"email_on_failure": False,
76+
"email_on_retry": False,
77+
}
78+
79+
with models.DAG(
80+
"data_analytics_dag",
81+
# Continue to run DAG once per day
82+
schedule_interval=datetime.timedelta(days=1),
83+
default_args=default_dag_args,
84+
) as dag:
85+
86+
create_batch = dataproc.DataprocCreateBatchOperator(
87+
task_id="create_batch",
88+
project_id=PROJECT_NAME,
89+
region="{{ var.value.gce_region }}",
90+
batch=BATCH_CONFIG,
91+
batch_id=BATCH_ID,
92+
)
93+
94+
load_external_dataset = GCSToBigQueryOperator(
95+
task_id="run_bq_external_ingestion",
96+
bucket=BUCKET_NAME,
97+
source_objects=["ghcn-stations-processed.csv"],
98+
destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.ghcnd-stations-new",
99+
source_format="CSV",
100+
schema_fields=[
101+
{"name": "ID", "type": "STRING", "mode": "REQUIRED"},
102+
{"name": "LATITUDE", "type": "FLOAT", "mode": "REQUIRED"},
103+
{"name": "LONGITUDE", "type": "FLOAT", "mode": "REQUIRED"},
104+
{"name": "ELEVATION", "type": "FLOAT", "mode": "REQUIRED"},
105+
{"name": "STATE", "type": "STRING", "mode": "NULLABLE"},
106+
{"name": "NAME", "type": "STRING", "mode": "REQUIRED"},
107+
],
108+
write_disposition="WRITE_TRUNCATE",
109+
)
110+
111+
with TaskGroup("join_bq_datasets") as bq_join_group:
112+
113+
for year in range(1997, 2022):
114+
# BigQuery configs
115+
BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
116+
GHCND_STATIONS_JOIN_QUERY = f"""
117+
SELECT Stations.ID, Stations.LATITUDE, Stations.LONGITUDE,
118+
Stations.STATE, Table.DATE, Table.ELEMENT, Table.VALUE
119+
FROM `{PROJECT_NAME}.expansion_project.ghcnd-stations-new` AS Stations, {BQ_DATASET_NAME} AS Table
120+
WHERE Stations.ID = Table.id
121+
"""
122+
123+
bq_join_stations_data = BigQueryInsertJobOperator(
124+
task_id=f"bq_join_stations_data_{str(year)}",
125+
configuration={
126+
"query": {
127+
"query": GHCND_STATIONS_JOIN_QUERY,
128+
"useLegacySql": False,
129+
"destinationTable": {
130+
"projectId": PROJECT_NAME,
131+
"datasetId": BQ_DESTINATION_DATASET_NAME,
132+
"tableId": BQ_DESTINATION_TABLE_NAME,
133+
},
134+
"writeDisposition": "WRITE_APPEND",
135+
}
136+
},
137+
location="US",
138+
)
139+
140+
load_external_dataset >> bq_join_group >> create_batch
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import internal_unit_testing
16+
17+
18+
def test_dag_import():
19+
"""Test that the DAG file can be successfully imported.
20+
21+
This tests that the DAG can be parsed, but does not run it in an Airflow
22+
environment. This is a recommended confidence check by the official Airflow
23+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
24+
"""
25+
26+
import data_analytics_dag_expansion
27+
28+
internal_unit_testing.assert_has_valid_dag(data_analytics_dag_expansion)

0 commit comments

Comments
 (0)