Skip to content

data analytics tutorial expansion project #8290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 61 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
bcac5d4
Kaiyang expansion project 2022 (#8224)
kaiyang-code Aug 18, 2022
9cc22b9
run blacken on dag and dataproc code
leahecole Aug 19, 2022
4f03254
WIP: not working test for process job
leahecole Aug 19, 2022
73ead05
working test for expansion dataproc script
leahecole Aug 25, 2022
c945c67
move dataproc expansion files to separate directory
leahecole Aug 26, 2022
977211a
add readme
leahecole Aug 26, 2022
0160059
update readme
leahecole Aug 26, 2022
d1f8464
run black
leahecole Aug 26, 2022
d3a8321
ignore data file
leahecole Aug 26, 2022
739847c
fix import order
leahecole Aug 26, 2022
290c5f9
ignore one line of lint because it's being silly
leahecole Aug 26, 2022
c8d6638
add check for Notfound for test
leahecole Aug 29, 2022
f73cf11
add requirements files
leahecole Aug 29, 2022
dbb14b8
add noxfile config
leahecole Aug 29, 2022
3280031
update try/except
leahecole Aug 29, 2022
4ef01c0
experiment - fully qualify path
leahecole Aug 29, 2022
006fc96
update filepath
leahecole Aug 29, 2022
7964dff
update path
leahecole Aug 29, 2022
a91f961
try different path
leahecole Aug 29, 2022
70ecf3b
remove the directory that was causing test problems
leahecole Aug 30, 2022
949f1ea
Merge branch 'main' into kaiyang_expansion_project
leahecole Aug 30, 2022
a84efb9
fix typo in header checker
leahecole Aug 30, 2022
a36c471
Merge branch 'main' into kaiyang_expansion_project
m-strzelczyk Aug 31, 2022
447a3fc
tell folks to skip cleanup of prereq
leahecole Sep 1, 2022
9276dc3
clean up hyperlinks for distance weighting and arithmetic mean
leahecole Sep 1, 2022
51fdb0b
fix math links again
leahecole Sep 1, 2022
7d93e6a
remove debug statements
leahecole Sep 1, 2022
e71c472
remove commented out variables
leahecole Sep 1, 2022
c182fa8
Update composer/2022_airflow_summit/data_analytics_dag_expansion_test.py
leahecole Sep 1, 2022
b2d35bf
Apply suggestions from code review
leahecole Sep 19, 2022
b4fbc66
Apply suggestions from code review
leahecole Sep 19, 2022
6116179
update apache-beam version (#8302)
davidcavazos Aug 31, 2022
a4537c7
dataflow: replace job name underscores with hyphens (#8303)
davidcavazos Aug 31, 2022
0ba0578
chore(deps): update dependency datalab to v1.2.1 (#8309)
renovate-bot Sep 6, 2022
ef78c07
fix: unsanitized output (#8316)
engelke Sep 6, 2022
cce48dd
chore(deps): update dependency cryptography to v38 (#8317)
renovate-bot Sep 7, 2022
f6580d8
Remove region tags to be consistent with other languages (#8322)
averikitsch Sep 15, 2022
3e94a27
fix lint in conftest (#8324)
leahecole Sep 15, 2022
3d017a2
Pin perl version to 5.34.0 as latest doesn't work with the example. (…
mhenc Sep 15, 2022
2f6a4ee
refactor fixtures
leahecole Sep 16, 2022
cbabba5
revert last change
leahecole Sep 16, 2022
ee70c73
revert last change
leahecole Sep 16, 2022
7a8b0c0
chore(deps): update dependency tensorflow to v2.7.2 [security] (#8329)
renovate-bot Sep 19, 2022
0af547f
remove backoff, add manual retry (#8328)
leahecole Sep 20, 2022
846570f
Merge branch 'main' into kaiyang_expansion_project
leahecole Sep 20, 2022
dba937c
refactor test to match #8328
leahecole Sep 20, 2022
1ce4787
update most write methods, fix test issue with comparing to exception
leahecole Sep 20, 2022
64537b4
Bmiro kaiyang edit (#8350)
bradmiro Sep 23, 2022
f3602d1
Merge branch 'main' into kaiyang_expansion_project
leahecole Sep 23, 2022
72c6c08
run black on process files
leahecole Sep 23, 2022
0af7f90
fix relative import issue
leahecole Sep 23, 2022
8a017d0
Merge branch 'main' into kaiyang_expansion_project
leahecole Sep 27, 2022
80e3f12
fixed jvm error (#8360)
bradmiro Sep 27, 2022
c531096
Add UDF type hinting (#8361)
bradmiro Sep 27, 2022
f6b951d
Update composer/2022_airflow_summit/data_analytics_process_expansion.py
leahecole Sep 27, 2022
8c345a5
fix comment alignment
leahecole Sep 27, 2022
5e0b214
Merge branch 'main' into kaiyang_expansion_project
leahecole Sep 28, 2022
a486f78
change dataproc region to northamerica-northeast1
leahecole Sep 28, 2022
fb6793f
Merge branch 'main' into kaiyang_expansion_project
leahecole Sep 28, 2022
a00f400
refactor import
leahecole Sep 28, 2022
9302893
switch other test to also use northamerica-northeast1
leahecole Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/header-checker-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ignoreFiles:
- "**/constraints.txt"
- "**/constraints-test.txt"
- "**/apt.txt"
- "**/ghcn-stations.txt"
- "**/ghcnd-stations.txt"


sourceFileExtensions:
Expand Down
36 changes: 36 additions & 0 deletions composer/2022_airflow_summit/DATAPROC_EXPANSION_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Dataproc extension for the Data Analytics Example

## Data in this directory
* [`ghcnd-stations.txt`](./ghcnd-stations.txt) is a freely available dataset about weather stations used in [US government climate data](https://www.ncei.noaa.gov/access/metadata/landing-page/bin/iso?id=gov.noaa.ncdc:C00861). A direct download link can be found at that linked site.
* [`ghcn-stations-processed.csv`](./ghcn-stations-processed.csv) is generated from the `ghcnd-stations.txt` text file. To generate this file yourself, run `python data_processing_helper.py` from this directory


## Prerequisites
Go through the tutorial to [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) skipping the cleanup steps.

## About this example

This directory has a DAG similar to the data analytics DAG found in the [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) tutorial, but includes a more complicated data processing step with Dataproc. Instead of answering the question, "How warm was it in Chicago on Thanksgiving for the past 25 years?" you will answer the question, "How have the rainfall patterns changed over the past 25 years in the western part of the US and in Phoenix, AZ?" For this example, the western part of the US is defined as the [census defined West region](https://www2.census.gov/geo/pdfs/maps-data/maps/reference/us_regdiv.pdf). Phoenix is used in this example because it is a city that has been affected by climate change in recent years, especially with respect to water.

The Dataproc Serverless job uses [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) to calculate precipitation and snowfall in the western states, and uses [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) to focus on the Phoenix specific area.


The DAG has three steps:

1. Ingest the data about the weather stations from Cloud Storage into BigQuery
2. Use BigQuery to join the weather station data with the data used in the prior tutorial - the [GHCN data](https://console.cloud.google.com/marketplace/details/noaa-public/ghcn-d?_ga=2.256175883.1820196808.1661536029-806997694.1661364277) and write the results to a table
3. Run a Dataproc Serverless job that processes the data by
1. Removing any data points that are not from weather stations located in the Western US
2. Removing any data points that are not about snow or other precipitation (data where `ELEMENT` is not `SNOW` or `PRCP`)
3. Convert the values in the `ELEMENT` column (now equal to `SNOW` or `PRCP`) to be in mm, instead of tenths of a mm.
4. Extract the year from the date so the `Date` column is left only with the year
5. Calculate the [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) of precipitation and of snowfall
6. Calculate the [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) for Phoenix.
7. Write the results to tables in BigQuery

## Running this sample
* Add `data_analytics_dag_expansion.py` to the Composer environment you used in the previous tutorial
* Add `data_analytics_process_expansion.py` and `ghcn-stations-processed.csv` to the Cloud Storage bucket you created in the previous tutorial
* Create an empty BigQuery dataset called `precipitation_changes`

You do not need to add any additional Airflow variables, add any additional permissions, or create any other resources.
6 changes: 5 additions & 1 deletion composer/2022_airflow_summit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ Running a number of tasks in parallel showcases autoscaling in a Cloud Composer

## data_analytics_dag

Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless. More detailed documentation can be found for this DAG [in the Composer documentation](https://cloud.google.com/composer/docs/data-analytics-googlecloud)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless. More detailed documentation can be found for this DAG [in the Composer documentation](https://cloud.google.com/composer/docs/data-analytics-googlecloud)
Runs a basic Data Analytics workflow using BigQuery, Cloud Storage, and Dataproc Serverless. More detailed documentation can be found for this DAG in the [Composer documentation](https://cloud.google.com/composer/docs/data-analytics-googlecloud)

I remember seeing somewhere in the TW tips and guides to hyperlink only the relevant text; just a nit 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg that's a tip I give other people good call


## data_analytics_dag_expansion

This DAG is nearly identical to `data_analytics_dag` only it features a more complex Dataproc job. For more info, refer to the [README](./DATAPROC_EXPANSION_README.md)

## retries

Expand Down
140 changes: 140 additions & 0 deletions composer/2022_airflow_summit/data_analytics_dag_expansion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This DAG script is an expansion of data_analytics_dag.py that runs a more complex Dataproc job found in data_analytics_process_expansion.py

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "precipitation_changes"
BQ_DESTINATION_TABLE_NAME = "ghcnd_stations_joined"
BQ_NORMALIZED_TABLE_NAME = "ghcnd_stations_normalized"
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp"
BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process_expansion.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}" # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
"pyspark_batch": {
"jar_file_uris": [PYSPARK_JAR],
"main_python_file_uri": PROCESSING_PYTHON_FILE,
"args": [
BUCKET_NAME,
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PRCP_MEAN_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_SNOW_MEAN_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}",
],
},
"environment_config": {
"execution_config": {
"service_account": "{{var.value.dataproc_service_account}}"
}
},
}

yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
"start_date": yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
}

with models.DAG(
"data_analytics_dag",
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args,
) as dag:

create_batch = dataproc.DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_NAME,
region="{{ var.value.gce_region }}",
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
)

load_external_dataset = GCSToBigQueryOperator(
task_id="run_bq_external_ingestion",
bucket=BUCKET_NAME,
source_objects=["ghcn-stations-processed.csv"],
destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.ghcnd-stations-new",
source_format="CSV",
schema_fields=[
{"name": "ID", "type": "STRING", "mode": "REQUIRED"},
{"name": "LATITUDE", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "LONGITUDE", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "ELEVATION", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "STATE", "type": "STRING", "mode": "NULLABLE"},
{"name": "NAME", "type": "STRING", "mode": "REQUIRED"},
],
write_disposition="WRITE_TRUNCATE",
)

with TaskGroup("join_bq_datasets") as bq_join_group:

for year in range(1997, 2022):
# BigQuery configs
BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
GHCND_STATIONS_JOIN_QUERY = f"""
SELECT Stations.ID, Stations.LATITUDE, Stations.LONGITUDE,
Stations.STATE, Table.DATE, Table.ELEMENT, Table.VALUE
FROM `{PROJECT_NAME}.expansion_project.ghcnd-stations-new` AS Stations, {BQ_DATASET_NAME} AS Table
WHERE Stations.ID = Table.id
"""

bq_join_stations_data = BigQueryInsertJobOperator(
task_id=f"bq_join_stations_data_{str(year)}",
configuration={
"query": {
"query": GHCND_STATIONS_JOIN_QUERY,
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_NAME,
"datasetId": BQ_DESTINATION_DATASET_NAME,
"tableId": BQ_DESTINATION_TABLE_NAME,
},
"writeDisposition": "WRITE_APPEND",
}
},
location="US",
)

load_external_dataset >> bq_join_group >> create_batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import internal_unit_testing


def test_dag_import():
"""Test that the DAG file can be successfully imported.

This tests that the DAG can be parsed, but does not run it in an Airflow
environment. This is a recommended confidence check by the official Airflow
docs: https://airflow.incubator.apache.org/tutorial.html#testing
"""

import data_analytics_dag_expansion

internal_unit_testing.assert_has_valid_dag(data_analytics_dag_expansion)
Loading