Skip to content

Commit 72c6c08

Browse files
committed
run black on process files
1 parent f3602d1 commit 72c6c08

File tree

2 files changed

+31
-37
lines changed

2 files changed

+31
-37
lines changed

composer/2022_airflow_summit/data_analytics_process_expansion.py

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,35 @@
2626
# Inverse Distance Weighting algorithm (DWA)
2727
@f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP)
2828
def phx_dw_compute(year, df) -> pd.DataFrame:
29-
# This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting
30-
# based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher
31-
# its measurement is weighed.
32-
#
33-
# This function combines the distance equation and inverse distance factor since the distance equation is:
34-
#
35-
# d = sqrt((x1-x2)^2 + (y1-y2)^2))
36-
#
37-
# and the inverse distance factor is:
38-
#
39-
# idf = 1 / d^2
40-
#
41-
# so we negate the square and square root to combine this into:
42-
#
43-
# idf = 1 / ((x1-x2)^2 + (y1-y2)^2))
44-
45-
# Latitude and longitude of Phoenix
46-
PHX_LATITUDE = 33.4484
47-
PHX_LONGITUDE = -112.0740
48-
49-
inverse_distance_factors = 1.0 / \
50-
((PHX_LATITUDE - df.LATITUDE) ** 2 +
51-
(PHX_LONGITUDE - df.LONGITUDE) ** 2)
52-
53-
# Calculate each station's weight
54-
weights = inverse_distance_factors / inverse_distance_factors.sum()
55-
56-
return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()})
29+
# This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting
30+
# based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher
31+
# its measurement is weighed.
32+
#
33+
# This function combines the distance equation and inverse distance factor since the distance equation is:
34+
#
35+
# d = sqrt((x1-x2)^2 + (y1-y2)^2))
36+
#
37+
# and the inverse distance factor is:
38+
#
39+
# idf = 1 / d^2
40+
#
41+
# so we negate the square and square root to combine this into:
42+
#
43+
# idf = 1 / ((x1-x2)^2 + (y1-y2)^2))
44+
45+
# Latitude and longitude of Phoenix
46+
PHX_LATITUDE = 33.4484
47+
PHX_LONGITUDE = -112.0740
48+
49+
inverse_distance_factors = 1.0 / (
50+
(PHX_LATITUDE - df.LATITUDE) ** 2 + (PHX_LONGITUDE - df.LONGITUDE) ** 2
51+
)
52+
53+
# Calculate each station's weight
54+
weights = inverse_distance_factors / inverse_distance_factors.sum()
55+
56+
return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()})
57+
5758

5859
if __name__ == "__main__":
5960
# read in the input argument
@@ -134,9 +135,7 @@ def phx_dw_compute(year, df) -> pd.DataFrame:
134135

135136
# Calculate the distance-weighted precipitation amount
136137
phx_annual_prcp_df = (
137-
annual_df.where(
138-
(annual_df.ELEMENT == "PRCP")
139-
)
138+
annual_df.where((annual_df.ELEMENT == "PRCP"))
140139
.groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR")
141140
.agg(f.sum("VALUE").alias("ANNUAL_AMOUNT"))
142141
.groupBy("YEAR")
@@ -145,9 +144,7 @@ def phx_dw_compute(year, df) -> pd.DataFrame:
145144

146145
# Calculate the distance-weighted snowfall amount
147146
phx_annual_snow_df = (
148-
annual_df.where(
149-
(annual_df.ELEMENT == "SNOW")
150-
)
147+
annual_df.where((annual_df.ELEMENT == "SNOW"))
151148
.groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR")
152149
.agg(f.sum("VALUE").alias("ANNUAL_AMOUNT"))
153150
.groupBy("YEAR")

composer/2022_airflow_summit/data_analytics_process_expansion_test.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ def test_dataproc_batch(test_bucket, bq_dataset):
7373
with pytest.raises(NotFound):
7474
BQ_CLIENT.get_table(f"{BQ_DESTINATION_DATASET_NAME}.{output_table}")
7575

76-
7776
BATCH_ID = f"summit-dag-expansion-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
7877
BATCH_CONFIG = {
7978
"pyspark_batch": {
@@ -215,10 +214,8 @@ def bq_dataset(test_bucket):
215214
print(f"Ignoring NotFound on cleanup, details: {e}")
216215

217216

218-
219217
def test_process(test_dataproc_batch):
220218
print(test_dataproc_batch)
221-
222219

223220
# check that the results table is there now
224221
assert (

0 commit comments

Comments
 (0)