Skip to content

Bmiro kaiyang edit #8350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 57 additions & 83 deletions composer/2022_airflow_summit/data_analytics_process_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,45 @@
# This PySpark program is trying to answer the question: "How has the rainfall
# and snowfall patterns changed in the western US for the past 25 years?"

import math
import sys

import pandas as pd

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, lit, sum, year
from pyspark.sql.types import StructType

import pyspark.sql.functions as f

# Inverse Distance Weighting algorithm (DWA)
@f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP)
def phx_dw_compute(year, df) -> pd.DataFrame:
# This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting
# based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher
# its measurement is weighed.
#
# This function combines the distance equation and inverse distance factor since the distance equation is:
#
# d = sqrt((x1-x2)^2 + (y1-y2)^2))
#
# and the inverse distance factor is:
#
# idf = 1 / d^2
#
# so we negate the square and square root to combine this into:
#
# idf = 1 / ((x1-x2)^2 + (y1-y2)^2))

# Latitude and longitude of Phoenix
PHX_LATITUDE = 33.4484
PHX_LONGITUDE = -112.0740

inverse_distance_factors = 1.0 / \
((PHX_LATITUDE - df.LATITUDE) ** 2 +
(PHX_LONGITUDE - df.LONGITUDE) ** 2)

# Calculate each station's weight
weights = inverse_distance_factors / inverse_distance_factors.sum()

return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()})

if __name__ == "__main__":
# read in the input argument
Expand Down Expand Up @@ -75,13 +106,13 @@

# Extract the year of each date and rename it as YEAR
# This will allow us to merge the data based on the years they are created instead of date
df = df.withColumn("DATE", year(df.DATE)).withColumnRenamed("DATE", "YEAR")
df = df.withColumn("DATE", f.year(df.DATE)).withColumnRenamed("DATE", "YEAR")

# Each year's arithmetic mean of precipitation
prcp_mean_df = (
df.where(df.ELEMENT == "PRCP")
.groupBy("YEAR")
.agg(avg("VALUE").alias("ANNUAL_PRCP_MEAN"))
.agg(f.avg("VALUE").alias("ANNUAL_PRCP_MEAN"))
.sort("YEAR")
)
print("PRCP mean table")
Expand All @@ -91,7 +122,7 @@
snow_mean_df = (
df.where(df.ELEMENT == "SNOW")
.groupBy("YEAR")
.agg(avg("VALUE").alias("ANNUAL_SNOW_MEAN"))
.agg(f.avg("VALUE").alias("ANNUAL_SNOW_MEAN"))
.sort("YEAR")
)
print("SNOW mean table")
Expand All @@ -101,86 +132,29 @@
states_near_phx = ["AZ", "CA", "CO", "NM", "NV", "UT"]
annual_df = df.where(df.STATE.isin(states_near_phx))

# Latitude and longitude of Phoenix, which are needed to perform DWA
phx_location = [33.4484, -112.0740]

# Create blank tables for storing the results of the distance weighting algorithm (DWA)
# The tables will have the following format
# +------------------+-------------------+------------------+
# |PHX_PRCP/SNOW_1997|... |PHX_PRCP/SNOW_2021|
# +------------------+-------------------+------------------+
# |DWA result (float)|... |DWA result (float)|
# +------------------+-------------------+------------------+
phx_annual_prcp_df = spark.createDataFrame([[]], StructType([]))
phx_annual_snow_df = spark.createDataFrame([[]], StructType([]))

# Distance weighting algorithm (DWA)
def phx_dw_compute(input_list) -> float:
# Input_list is a list of Row object with format:
# [
# ROW(ID='...', LATITUDE='...', LONGITUDE='...', YEAR='...', ANNUAL_PRCP/ANNUAL_SNOW='...'),
# ROW(ID='...', LATITUDE='...', LONGITUDE='...', YEAR='...', ANNUAL_PRCP/ANNUAL_SNOW='...'),
# ...
# ]

# Contains 1 / d^2 of each station
factor_list = []
# The total sum of 1 / d^2 of all the stations
factor_sum = 0.0
for row in input_list:
latitude = row[1]
longitude = row[2]
# Calculate the distance from each station to Phoenix
distance_to_phx = math.sqrt(
(phx_location[0] - latitude) ** 2 + (phx_location[1] - longitude) ** 2
)
# Calculate the distance factor of each station (1 / d^2)
distance_factor = 1.0 / (distance_to_phx**2)
factor_list.append(distance_factor)
factor_sum += distance_factor

# Contains the weights of each station
weights_list = [val / factor_sum for val in factor_list]

dwa_result = 0.0
for row in input_list:
# This is the annual prcipitation/snowfall of each station
annual_value = row[4]
# Weight of each station
weight = weights_list[input_list.index(row)]
dwa_result += weight * annual_value

return dwa_result

for year_val in range(1997, 2022):
# Collect() function returns a list of Row object.
# prcp_year and snow_year will be the input for the distance weighting algorithm
prcp_year = (
annual_df.where(
(annual_df.ELEMENT == "PRCP") & (annual_df.YEAR == year_val)
)
.groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR")
.agg(sum("VALUE").alias("ANNUAL_PRCP"))
.collect()
)

snow_year = (
annual_df.where(
(annual_df.ELEMENT == "SNOW") & (annual_df.YEAR == year_val)
)
.groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR")
.agg(sum("VALUE").alias("ANNUAL_SNOW"))
.collect()
# Calculate the distance-weighted precipitation amount
phx_annual_prcp_df = (
annual_df.where(
(annual_df.ELEMENT == "PRCP")
)
.groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR")
.agg(f.sum("VALUE").alias("ANNUAL_AMOUNT"))
.groupBy("YEAR")
.apply(phx_dw_compute)
)

phx_annual_prcp_df = phx_annual_prcp_df.withColumn(
f"PHX_PRCP_{year_val}", lit(phx_dw_compute(prcp_year))
)
phx_annual_snow_df = phx_annual_snow_df.withColumn(
f"PHX_SNOW_{year_val}", lit(phx_dw_compute(snow_year))
# Calculate the distance-weighted snowfall amount
phx_annual_snow_df = (
annual_df.where(
(annual_df.ELEMENT == "SNOW")
)
.groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR")
.agg(f.sum("VALUE").alias("ANNUAL_AMOUNT"))
.groupBy("YEAR")
.apply(phx_dw_compute)
)

# This table has only two rows (the first row contains the columns)
# Display the tables
phx_annual_prcp_df.show()
phx_annual_snow_df.show()

Expand Down