|
15 | 15 | # This PySpark program is trying to answer the question: "How has the rainfall
|
16 | 16 | # and snowfall patterns changed in the western US for the past 25 years?"
|
17 | 17 |
|
18 |
| -import math |
19 | 18 | import sys
|
20 | 19 |
|
| 20 | +import pandas as pd |
| 21 | + |
21 | 22 | from py4j.protocol import Py4JJavaError
|
22 | 23 | from pyspark.sql import SparkSession
|
23 |
| -from pyspark.sql.functions import avg, lit, sum, year |
24 |
| -from pyspark.sql.types import StructType |
25 |
| - |
| 24 | +import pyspark.sql.functions as f |
| 25 | + |
| 26 | +# Inverse Distance Weighting algorithm (DWA) |
| 27 | +@f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP) |
| 28 | +def phx_dw_compute(year, df) -> pd.DataFrame: |
| 29 | + # This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting |
| 30 | + # based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher |
| 31 | + # its measurement is weighed. |
| 32 | + # |
| 33 | + # This function combines the distance equation and inverse distance factor since the distance equation is: |
| 34 | + # |
| 35 | + # d = sqrt((x1-x2)^2 + (y1-y2)^2)) |
| 36 | + # |
| 37 | + # and the inverse distance factor is: |
| 38 | + # |
| 39 | + # idf = 1 / d^2 |
| 40 | + # |
| 41 | + # so we negate the square and square root to combine this into: |
| 42 | + # |
| 43 | + # idf = 1 / ((x1-x2)^2 + (y1-y2)^2)) |
| 44 | + |
| 45 | + # Latitude and longitude of Phoenix |
| 46 | + PHX_LATITUDE = 33.4484 |
| 47 | + PHX_LONGITUDE = -112.0740 |
| 48 | + |
| 49 | + inverse_distance_factors = 1.0 / \ |
| 50 | + ((PHX_LATITUDE - df.LATITUDE) ** 2 + |
| 51 | + (PHX_LONGITUDE - df.LONGITUDE) ** 2) |
| 52 | + |
| 53 | + # Calculate each station's weight |
| 54 | + weights = inverse_distance_factors / inverse_distance_factors.sum() |
| 55 | + |
| 56 | + return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()}) |
26 | 57 |
|
27 | 58 | if __name__ == "__main__":
|
28 | 59 | # read in the input argument
|
|
75 | 106 |
|
76 | 107 | # Extract the year of each date and rename it as YEAR
|
77 | 108 | # This will allow us to merge the data based on the years they are created instead of date
|
78 |
| - df = df.withColumn("DATE", year(df.DATE)).withColumnRenamed("DATE", "YEAR") |
| 109 | + df = df.withColumn("DATE", f.year(df.DATE)).withColumnRenamed("DATE", "YEAR") |
79 | 110 |
|
80 | 111 | # Each year's arithmetic mean of precipitation
|
81 | 112 | prcp_mean_df = (
|
82 | 113 | df.where(df.ELEMENT == "PRCP")
|
83 | 114 | .groupBy("YEAR")
|
84 |
| - .agg(avg("VALUE").alias("ANNUAL_PRCP_MEAN")) |
| 115 | + .agg(f.avg("VALUE").alias("ANNUAL_PRCP_MEAN")) |
85 | 116 | .sort("YEAR")
|
86 | 117 | )
|
87 | 118 | print("PRCP mean table")
|
|
91 | 122 | snow_mean_df = (
|
92 | 123 | df.where(df.ELEMENT == "SNOW")
|
93 | 124 | .groupBy("YEAR")
|
94 |
| - .agg(avg("VALUE").alias("ANNUAL_SNOW_MEAN")) |
| 125 | + .agg(f.avg("VALUE").alias("ANNUAL_SNOW_MEAN")) |
95 | 126 | .sort("YEAR")
|
96 | 127 | )
|
97 | 128 | print("SNOW mean table")
|
|
101 | 132 | states_near_phx = ["AZ", "CA", "CO", "NM", "NV", "UT"]
|
102 | 133 | annual_df = df.where(df.STATE.isin(states_near_phx))
|
103 | 134 |
|
104 |
| - # Latitude and longitude of Phoenix, which are needed to perform DWA |
105 |
| - phx_location = [33.4484, -112.0740] |
106 |
| - |
107 |
| - # Create blank tables for storing the results of the distance weighting algorithm (DWA) |
108 |
| - # The tables will have the following format |
109 |
| - # +------------------+-------------------+------------------+ |
110 |
| - # |PHX_PRCP/SNOW_1997|... |PHX_PRCP/SNOW_2021| |
111 |
| - # +------------------+-------------------+------------------+ |
112 |
| - # |DWA result (float)|... |DWA result (float)| |
113 |
| - # +------------------+-------------------+------------------+ |
114 |
| - phx_annual_prcp_df = spark.createDataFrame([[]], StructType([])) |
115 |
| - phx_annual_snow_df = spark.createDataFrame([[]], StructType([])) |
116 |
| - |
117 |
| - # Distance weighting algorithm (DWA) |
118 |
| - def phx_dw_compute(input_list) -> float: |
119 |
| - # Input_list is a list of Row object with format: |
120 |
| - # [ |
121 |
| - # ROW(ID='...', LATITUDE='...', LONGITUDE='...', YEAR='...', ANNUAL_PRCP/ANNUAL_SNOW='...'), |
122 |
| - # ROW(ID='...', LATITUDE='...', LONGITUDE='...', YEAR='...', ANNUAL_PRCP/ANNUAL_SNOW='...'), |
123 |
| - # ... |
124 |
| - # ] |
125 |
| - |
126 |
| - # Contains 1 / d^2 of each station |
127 |
| - factor_list = [] |
128 |
| - # The total sum of 1 / d^2 of all the stations |
129 |
| - factor_sum = 0.0 |
130 |
| - for row in input_list: |
131 |
| - latitude = row[1] |
132 |
| - longitude = row[2] |
133 |
| - # Calculate the distance from each station to Phoenix |
134 |
| - distance_to_phx = math.sqrt( |
135 |
| - (phx_location[0] - latitude) ** 2 + (phx_location[1] - longitude) ** 2 |
136 |
| - ) |
137 |
| - # Calculate the distance factor of each station (1 / d^2) |
138 |
| - distance_factor = 1.0 / (distance_to_phx**2) |
139 |
| - factor_list.append(distance_factor) |
140 |
| - factor_sum += distance_factor |
141 |
| - |
142 |
| - # Contains the weights of each station |
143 |
| - weights_list = [val / factor_sum for val in factor_list] |
144 |
| - |
145 |
| - dwa_result = 0.0 |
146 |
| - for row in input_list: |
147 |
| - # This is the annual prcipitation/snowfall of each station |
148 |
| - annual_value = row[4] |
149 |
| - # Weight of each station |
150 |
| - weight = weights_list[input_list.index(row)] |
151 |
| - dwa_result += weight * annual_value |
152 |
| - |
153 |
| - return dwa_result |
154 |
| - |
155 |
| - for year_val in range(1997, 2022): |
156 |
| - # Collect() function returns a list of Row object. |
157 |
| - # prcp_year and snow_year will be the input for the distance weighting algorithm |
158 |
| - prcp_year = ( |
159 |
| - annual_df.where( |
160 |
| - (annual_df.ELEMENT == "PRCP") & (annual_df.YEAR == year_val) |
161 |
| - ) |
162 |
| - .groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR") |
163 |
| - .agg(sum("VALUE").alias("ANNUAL_PRCP")) |
164 |
| - .collect() |
165 |
| - ) |
166 |
| - |
167 |
| - snow_year = ( |
168 |
| - annual_df.where( |
169 |
| - (annual_df.ELEMENT == "SNOW") & (annual_df.YEAR == year_val) |
170 |
| - ) |
171 |
| - .groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR") |
172 |
| - .agg(sum("VALUE").alias("ANNUAL_SNOW")) |
173 |
| - .collect() |
| 135 | + # Calculate the distance-weighted precipitation amount |
| 136 | + phx_annual_prcp_df = ( |
| 137 | + annual_df.where( |
| 138 | + (annual_df.ELEMENT == "PRCP") |
174 | 139 | )
|
| 140 | + .groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR") |
| 141 | + .agg(f.sum("VALUE").alias("ANNUAL_AMOUNT")) |
| 142 | + .groupBy("YEAR") |
| 143 | + .apply(phx_dw_compute) |
| 144 | + ) |
175 | 145 |
|
176 |
| - phx_annual_prcp_df = phx_annual_prcp_df.withColumn( |
177 |
| - f"PHX_PRCP_{year_val}", lit(phx_dw_compute(prcp_year)) |
178 |
| - ) |
179 |
| - phx_annual_snow_df = phx_annual_snow_df.withColumn( |
180 |
| - f"PHX_SNOW_{year_val}", lit(phx_dw_compute(snow_year)) |
| 146 | + # Calculate the distance-weighted snowfall amount |
| 147 | + phx_annual_snow_df = ( |
| 148 | + annual_df.where( |
| 149 | + (annual_df.ELEMENT == "SNOW") |
181 | 150 | )
|
| 151 | + .groupBy("ID", "LATITUDE", "LONGITUDE", "YEAR") |
| 152 | + .agg(f.sum("VALUE").alias("ANNUAL_AMOUNT")) |
| 153 | + .groupBy("YEAR") |
| 154 | + .apply(phx_dw_compute) |
| 155 | + ) |
182 | 156 |
|
183 |
| - # This table has only two rows (the first row contains the columns) |
| 157 | + # Display the tables |
184 | 158 | phx_annual_prcp_df.show()
|
185 | 159 | phx_annual_snow_df.show()
|
186 | 160 |
|
|
0 commit comments