-
Notifications
You must be signed in to change notification settings - Fork 6.6k
data analytics tutorial expansion project #8290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
61 commits
Select commit
Hold shift + click to select a range
bcac5d4
Kaiyang expansion project 2022 (#8224)
kaiyang-code 9cc22b9
run blacken on dag and dataproc code
leahecole 4f03254
WIP: not working test for process job
leahecole 73ead05
working test for expansion dataproc script
leahecole c945c67
move dataproc expansion files to separate directory
leahecole 977211a
add readme
leahecole 0160059
update readme
leahecole d1f8464
run black
leahecole d3a8321
ignore data file
leahecole 739847c
fix import order
leahecole 290c5f9
ignore one line of lint because it's being silly
leahecole c8d6638
add check for Notfound for test
leahecole f73cf11
add requirements files
leahecole dbb14b8
add noxfile config
leahecole 3280031
update try/except
leahecole 4ef01c0
experiment - fully qualify path
leahecole 006fc96
update filepath
leahecole 7964dff
update path
leahecole a91f961
try different path
leahecole 70ecf3b
remove the directory that was causing test problems
leahecole 949f1ea
Merge branch 'main' into kaiyang_expansion_project
leahecole a84efb9
fix typo in header checker
leahecole a36c471
Merge branch 'main' into kaiyang_expansion_project
m-strzelczyk 447a3fc
tell folks to skip cleanup of prereq
leahecole 9276dc3
clean up hyperlinks for distance weighting and arithmetic mean
leahecole 51fdb0b
fix math links again
leahecole 7d93e6a
remove debug statements
leahecole e71c472
remove commented out variables
leahecole c182fa8
Update composer/2022_airflow_summit/data_analytics_dag_expansion_test.py
leahecole b2d35bf
Apply suggestions from code review
leahecole b4fbc66
Apply suggestions from code review
leahecole 6116179
update apache-beam version (#8302)
davidcavazos a4537c7
dataflow: replace job name underscores with hyphens (#8303)
davidcavazos 0ba0578
chore(deps): update dependency datalab to v1.2.1 (#8309)
renovate-bot ef78c07
fix: unsanitized output (#8316)
engelke cce48dd
chore(deps): update dependency cryptography to v38 (#8317)
renovate-bot f6580d8
Remove region tags to be consistent with other languages (#8322)
averikitsch 3e94a27
fix lint in conftest (#8324)
leahecole 3d017a2
Pin perl version to 5.34.0 as latest doesn't work with the example. (…
mhenc 2f6a4ee
refactor fixtures
leahecole cbabba5
revert last change
leahecole ee70c73
revert last change
leahecole 7a8b0c0
chore(deps): update dependency tensorflow to v2.7.2 [security] (#8329)
renovate-bot 0af547f
remove backoff, add manual retry (#8328)
leahecole 846570f
Merge branch 'main' into kaiyang_expansion_project
leahecole dba937c
refactor test to match #8328
leahecole 1ce4787
update most write methods, fix test issue with comparing to exception
leahecole 64537b4
Bmiro kaiyang edit (#8350)
bradmiro f3602d1
Merge branch 'main' into kaiyang_expansion_project
leahecole 72c6c08
run black on process files
leahecole 0af7f90
fix relative import issue
leahecole 8a017d0
Merge branch 'main' into kaiyang_expansion_project
leahecole 80e3f12
fixed jvm error (#8360)
bradmiro c531096
Add UDF type hinting (#8361)
bradmiro f6b951d
Update composer/2022_airflow_summit/data_analytics_process_expansion.py
leahecole 8c345a5
fix comment alignment
leahecole 5e0b214
Merge branch 'main' into kaiyang_expansion_project
leahecole a486f78
change dataproc region to northamerica-northeast1
leahecole fb6793f
Merge branch 'main' into kaiyang_expansion_project
leahecole a00f400
refactor import
leahecole 9302893
switch other test to also use northamerica-northeast1
leahecole File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# Dataproc extension for the Data Analytics Example | ||
|
||
## Data in this directory | ||
* [`ghcnd-stations.txt`](./ghcnd-stations.txt) is a freely available dataset about weather stations used in [US government climate data](https://www.ncei.noaa.gov/access/metadata/landing-page/bin/iso?id=gov.noaa.ncdc:C00861). A direct download link can be found at that linked site. | ||
* [`ghcn-stations-processed.csv`](./ghcn-stations-processed.csv) is generated from the `ghcnd-stations.txt` text file. To generate this file yourself, run `python data_processing_helper.py` from this directory | ||
|
||
|
||
## Prerequisites | ||
Go through the tutorial to [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) skipping the cleanup steps. | ||
|
||
## About this example | ||
|
||
This directory has a DAG similar to the data analytics DAG found in the [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) tutorial, but includes a more complicated data processing step with Dataproc. Instead of answering the question, "How warm was it in Chicago on Thanksgiving for the past 25 years?" you will answer the question, "How have the rainfall patterns changed over the past 25 years in the western part of the US and in Phoenix, AZ?" For this example, the western part of the US is defined as the [census defined West region](https://www2.census.gov/geo/pdfs/maps-data/maps/reference/us_regdiv.pdf). Phoenix is used in this example because it is a city that has been affected by climate change in recent years, especially with respect to water. | ||
|
||
The Dataproc Serverless job uses [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) to calculate precipitation and snowfall in the western states, and uses [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) to focus on the Phoenix specific area. | ||
|
||
|
||
The DAG has three steps: | ||
|
||
1. Ingest the data about the weather stations from Cloud Storage into BigQuery | ||
2. Use BigQuery to join the weather station data with the data used in the prior tutorial - the [GHCN data](https://console.cloud.google.com/marketplace/details/noaa-public/ghcn-d?_ga=2.256175883.1820196808.1661536029-806997694.1661364277) and write the results to a table | ||
3. Run a Dataproc Serverless job that processes the data by | ||
1. Removing any data points that are not from weather stations located in the Western US | ||
2. Removing any data points that are not about snow or other precipitation (data where `ELEMENT` is not `SNOW` or `PRCP`) | ||
3. Convert the values in the `ELEMENT` column (now equal to `SNOW` or `PRCP`) to be in mm, instead of tenths of a mm. | ||
4. Extract the year from the date so the `Date` column is left only with the year | ||
5. Calculate the [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) of precipitation and of snowfall | ||
6. Calculate the [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) for Phoenix. | ||
7. Write the results to tables in BigQuery | ||
|
||
## Running this sample | ||
* Add `data_analytics_dag_expansion.py` to the Composer environment you used in the previous tutorial | ||
* Add `data_analytics_process_expansion.py` and `ghcn-stations-processed.csv` to the Cloud Storage bucket you created in the previous tutorial | ||
* Create an empty BigQuery dataset called `precipitation_changes` | ||
|
||
You do not need to add any additional Airflow variables, add any additional permissions, or create any other resources. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -11,7 +11,11 @@ Running a number of tasks in parallel showcases autoscaling in a Cloud Composer | |||||
|
||||||
## data_analytics_dag | ||||||
|
||||||
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless | ||||||
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless. More detailed documentation can be found for this DAG [in the Composer documentation](https://cloud.google.com/composer/docs/data-analytics-googlecloud) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I remember seeing somewhere in the TW tips and guides to hyperlink only the relevant text; just a nit 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. omg that's a tip I give other people good call |
||||||
|
||||||
## data_analytics_dag_expansion | ||||||
|
||||||
This DAG is nearly identical to `data_analytics_dag` only it features a more complex Dataproc job. For more info, refer to the [README](./DATAPROC_EXPANSION_README.md) | ||||||
|
||||||
## retries | ||||||
|
||||||
|
140 changes: 140 additions & 0 deletions
140
composer/2022_airflow_summit/data_analytics_dag_expansion.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
# Copyright 2022 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# This DAG script is an expansion of data_analytics_dag.py that runs a more complex Dataproc job found in data_analytics_process_expansion.py | ||
|
||
import datetime | ||
|
||
from airflow import models | ||
from airflow.providers.google.cloud.operators import dataproc | ||
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator | ||
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( | ||
GCSToBigQueryOperator, | ||
) | ||
from airflow.utils.task_group import TaskGroup | ||
|
||
PROJECT_NAME = "{{var.value.gcp_project}}" | ||
|
||
# BigQuery configs | ||
BQ_DESTINATION_DATASET_NAME = "precipitation_changes" | ||
BQ_DESTINATION_TABLE_NAME = "ghcnd_stations_joined" | ||
BQ_NORMALIZED_TABLE_NAME = "ghcnd_stations_normalized" | ||
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" | ||
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" | ||
BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp" | ||
BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow" | ||
|
||
# Dataproc configs | ||
BUCKET_NAME = "{{var.value.gcs_bucket}}" | ||
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar" | ||
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process_expansion.py" | ||
|
||
BATCH_ID = "data-processing-{{ ts_nodash | lower}}" # Dataproc serverless only allows lowercase characters | ||
BATCH_CONFIG = { | ||
"pyspark_batch": { | ||
"jar_file_uris": [PYSPARK_JAR], | ||
"main_python_file_uri": PROCESSING_PYTHON_FILE, | ||
"args": [ | ||
BUCKET_NAME, | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PRCP_MEAN_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_SNOW_MEAN_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}", | ||
], | ||
}, | ||
"environment_config": { | ||
"execution_config": { | ||
"service_account": "{{var.value.dataproc_service_account}}" | ||
} | ||
}, | ||
} | ||
|
||
yesterday = datetime.datetime.combine( | ||
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time() | ||
) | ||
|
||
default_dag_args = { | ||
# Setting start date as yesterday starts the DAG immediately when it is | ||
# detected in the Cloud Storage bucket. | ||
"start_date": yesterday, | ||
# To email on failure or retry set 'email' arg to your email and enable | ||
# emailing here. | ||
"email_on_failure": False, | ||
"email_on_retry": False, | ||
} | ||
|
||
with models.DAG( | ||
"data_analytics_dag", | ||
# Continue to run DAG once per day | ||
schedule_interval=datetime.timedelta(days=1), | ||
default_args=default_dag_args, | ||
) as dag: | ||
|
||
create_batch = dataproc.DataprocCreateBatchOperator( | ||
task_id="create_batch", | ||
project_id=PROJECT_NAME, | ||
region="{{ var.value.gce_region }}", | ||
batch=BATCH_CONFIG, | ||
batch_id=BATCH_ID, | ||
) | ||
|
||
load_external_dataset = GCSToBigQueryOperator( | ||
task_id="run_bq_external_ingestion", | ||
bucket=BUCKET_NAME, | ||
source_objects=["ghcn-stations-processed.csv"], | ||
destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.ghcnd-stations-new", | ||
source_format="CSV", | ||
schema_fields=[ | ||
{"name": "ID", "type": "STRING", "mode": "REQUIRED"}, | ||
{"name": "LATITUDE", "type": "FLOAT", "mode": "REQUIRED"}, | ||
{"name": "LONGITUDE", "type": "FLOAT", "mode": "REQUIRED"}, | ||
{"name": "ELEVATION", "type": "FLOAT", "mode": "REQUIRED"}, | ||
{"name": "STATE", "type": "STRING", "mode": "NULLABLE"}, | ||
{"name": "NAME", "type": "STRING", "mode": "REQUIRED"}, | ||
], | ||
write_disposition="WRITE_TRUNCATE", | ||
) | ||
|
||
with TaskGroup("join_bq_datasets") as bq_join_group: | ||
|
||
for year in range(1997, 2022): | ||
# BigQuery configs | ||
BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}" | ||
GHCND_STATIONS_JOIN_QUERY = f""" | ||
SELECT Stations.ID, Stations.LATITUDE, Stations.LONGITUDE, | ||
Stations.STATE, Table.DATE, Table.ELEMENT, Table.VALUE | ||
FROM `{PROJECT_NAME}.expansion_project.ghcnd-stations-new` AS Stations, {BQ_DATASET_NAME} AS Table | ||
leahecole marked this conversation as resolved.
Show resolved
Hide resolved
|
||
WHERE Stations.ID = Table.id | ||
""" | ||
|
||
bq_join_stations_data = BigQueryInsertJobOperator( | ||
task_id=f"bq_join_stations_data_{str(year)}", | ||
configuration={ | ||
"query": { | ||
"query": GHCND_STATIONS_JOIN_QUERY, | ||
"useLegacySql": False, | ||
"destinationTable": { | ||
"projectId": PROJECT_NAME, | ||
"datasetId": BQ_DESTINATION_DATASET_NAME, | ||
"tableId": BQ_DESTINATION_TABLE_NAME, | ||
}, | ||
"writeDisposition": "WRITE_APPEND", | ||
} | ||
}, | ||
location="US", | ||
) | ||
|
||
load_external_dataset >> bq_join_group >> create_batch |
28 changes: 28 additions & 0 deletions
28
composer/2022_airflow_summit/data_analytics_dag_expansion_test.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Copyright 2022 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import internal_unit_testing | ||
|
||
|
||
def test_dag_import(): | ||
"""Test that the DAG file can be successfully imported. | ||
|
||
This tests that the DAG can be parsed, but does not run it in an Airflow | ||
environment. This is a recommended confidence check by the official Airflow | ||
docs: https://airflow.incubator.apache.org/tutorial.html#testing | ||
""" | ||
|
||
import data_analytics_dag_expansion | ||
|
||
internal_unit_testing.assert_has_valid_dag(data_analytics_dag_expansion) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.