Skip to content

Commit b2d35bf

Browse files
leahecoledandhlee
andauthored
Apply suggestions from code review
Co-authored-by: Dan Lee <[email protected]>
1 parent c182fa8 commit b2d35bf

File tree

3 files changed

+8
-10
lines changed

3 files changed

+8
-10
lines changed

composer/2022_airflow_summit/DATAPROC_EXPANSION_README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Go through the tutorial to [Run a data analytics DAG in Google Cloud](https://cl
1010

1111
## About this example
1212

13-
This directory has a DAG similar to the data analytics DAG found in the [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) tutorial, but includes a more complicated data processing step with Dataproc. Instead of answering the question, "How warm was it in Chicago on Thanksgiving for the past 25 years?" we will answer the question, "How have the rainfall patterns changed over the past 25 years in the western part of the US and in Phoenix, AZ?" We define the western part of the US as the [census defined West region](https://www2.census.gov/geo/pdfs/maps-data/maps/reference/us_regdiv.pdf), and chose Phoenix as it is a city that has been affected by climate change in recent years, especially with respect to water.
13+
This directory has a DAG similar to the data analytics DAG found in the [Run a data analytics DAG in Google Cloud](https://cloud.google.com/composer/docs/data-analytics-googlecloud) tutorial, but includes a more complicated data processing step with Dataproc. Instead of answering the question, "How warm was it in Chicago on Thanksgiving for the past 25 years?" you will answer the question, "How have the rainfall patterns changed over the past 25 years in the western part of the US and in Phoenix, AZ?" For this example, the western part of the US is defined as the [census defined West region](https://www2.census.gov/geo/pdfs/maps-data/maps/reference/us_regdiv.pdf). Phoenix is used in this example because it is a city that has been affected by climate change in recent years, especially with respect to water.
1414

1515
The Dataproc Serverless job uses [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) to calculate precipitation and snowfall in the western states, and uses [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) to focus on the Phoenix specific area.
1616

@@ -23,13 +23,13 @@ The DAG has three steps:
2323
1. Removing any data points that are not from weather stations located in the Western US
2424
2. Removing any data points that are not about snow or other precipitation (data where `ELEMENT` is not `SNOW` or `PRCP`)
2525
3. Convert the values in the `ELEMENT` column (now equal to `SNOW` or `PRCP`) to be in mm, instead of tenths of a mm.
26-
4. Extract the year from the date so the Date column is left only with the year
26+
4. Extract the year from the date so the `Date` column is left only with the year
2727
5. Calculate the [arithmetic mean](https://www.weather.gov/abrfc/map#arithmetic_mean) of precipitation and of snowfall
2828
6. Calculate the [distance weighting](https://www.weather.gov/abrfc/map#distance_weighting) for Phoenix.
2929
7. Write the results to tables in BigQuery
3030

3131
## Running this sample
32-
* Add `data_analytics_dag_expansion.py` to the Composer environent you used in the previous tutorial
32+
* Add `data_analytics_dag_expansion.py` to the Composer environment you used in the previous tutorial
3333
* Add `data_analytics_process_expansion.py` and `ghcn-stations-processed.csv` to the Cloud Storage bucket you created in the previous tutorial
3434
* Create an empty BigQuery dataset called `precipitation_changes`
3535

composer/2022_airflow_summit/data_analytics_dag_expansion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
# Dataproc configs
3939
BUCKET_NAME = "{{var.value.gcs_bucket}}"
40-
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
40+
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
4141
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process_expansion.py"
4242

4343
BATCH_ID = "data-processing-{{ ts_nodash | lower}}" # Dataproc serverless only allows lowercase characters

composer/2022_airflow_summit/data_analytics_process_expansion.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
except Py4JJavaError:
5454
raise Exception(f"Error reading {READ_TABLE}")
5555

56-
# Since our goal is to focus on the western US, we first filter out non-western states of the US.
56+
# Since the goal is to focus on the western US, you first filter out non-western states of the US.
5757
# The definition of western US can be found in the following link:
5858
# https://www2.census.gov/geo/pdfs/maps-data/maps/reference/us_regdiv.pdf
5959
western_states = ["AZ", "CA", "CO", "ID", "MT", "NM", "NV", "OR", "UT", "WA", "WY"]
@@ -84,7 +84,7 @@
8484
.agg(avg("VALUE").alias("ANNUAL_PRCP_MEAN"))
8585
.sort("YEAR")
8686
)
87-
print("prcp mean table")
87+
print("PRCP mean table")
8888
prcp_mean_df.show(n=50)
8989

9090
# Each year's arithmetic mean of snowfall
@@ -94,7 +94,7 @@
9494
.agg(avg("VALUE").alias("ANNUAL_SNOW_MEAN"))
9595
.sort("YEAR")
9696
)
97-
print("snow mean table")
97+
print("SNOW mean table")
9898
snow_mean_df.show(n=50)
9999

100100
# Filter out the states to move on to the distance weighting algorithm (DWA)
@@ -140,9 +140,7 @@ def phx_dw_compute(input_list) -> float:
140140
factor_sum += distance_factor
141141

142142
# Contains the weights of each station
143-
weights_list = []
144-
for val in factor_list:
145-
weights_list.append(val / factor_sum)
143+
weights_list = [val / factor_sum for val in factor_list]
146144

147145
dwa_result = 0.0
148146
for row in input_list:

0 commit comments

Comments
 (0)