|
23 | 23 | from pyspark.sql import SparkSession
|
24 | 24 | import pyspark.sql.functions as f
|
25 | 25 |
|
26 |
| -# Inverse Distance Weighting algorithm (DWA) |
27 |
| -@f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP) |
28 |
| -def phx_dw_compute(year, df) -> pd.DataFrame: |
29 |
| - # This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting |
30 |
| - # based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher |
31 |
| - # its measurement is weighed. |
32 |
| - # |
33 |
| - # This function combines the distance equation and inverse distance factor since the distance equation is: |
34 |
| - # |
35 |
| - # d = sqrt((x1-x2)^2 + (y1-y2)^2)) |
36 |
| - # |
37 |
| - # and the inverse distance factor is: |
38 |
| - # |
39 |
| - # idf = 1 / d^2 |
40 |
| - # |
41 |
| - # so we negate the square and square root to combine this into: |
42 |
| - # |
43 |
| - # idf = 1 / ((x1-x2)^2 + (y1-y2)^2)) |
44 |
| - |
45 |
| - # Latitude and longitude of Phoenix |
46 |
| - PHX_LATITUDE = 33.4484 |
47 |
| - PHX_LONGITUDE = -112.0740 |
48 |
| - |
49 |
| - inverse_distance_factors = 1.0 / ( |
50 |
| - (PHX_LATITUDE - df.LATITUDE) ** 2 + (PHX_LONGITUDE - df.LONGITUDE) ** 2 |
51 |
| - ) |
52 |
| - |
53 |
| - # Calculate each station's weight |
54 |
| - weights = inverse_distance_factors / inverse_distance_factors.sum() |
55 |
| - |
56 |
| - return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()}) |
57 |
| - |
58 | 26 |
|
59 | 27 | if __name__ == "__main__":
|
60 | 28 | # read in the input argument
|
@@ -133,6 +101,39 @@ def phx_dw_compute(year, df) -> pd.DataFrame:
|
133 | 101 | states_near_phx = ["AZ", "CA", "CO", "NM", "NV", "UT"]
|
134 | 102 | annual_df = df.where(df.STATE.isin(states_near_phx))
|
135 | 103 |
|
| 104 | + # Inverse Distance Weighting algorithm (DWA) |
| 105 | + @f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP) |
| 106 | + def phx_dw_compute(year, df) -> pd.DataFrame: |
| 107 | + # This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting |
| 108 | + # based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher |
| 109 | + # its measurement is weighed. |
| 110 | + # |
| 111 | + # This function combines the distance equation and inverse distance factor since the distance equation is: |
| 112 | + # |
| 113 | + # d = sqrt((x1-x2)^2 + (y1-y2)^2)) |
| 114 | + # |
| 115 | + # and the inverse distance factor is: |
| 116 | + # |
| 117 | + # idf = 1 / d^2 |
| 118 | + # |
| 119 | + # so we negate the square and square root to combine this into: |
| 120 | + # |
| 121 | + # idf = 1 / ((x1-x2)^2 + (y1-y2)^2)) |
| 122 | + |
| 123 | + # Latitude and longitude of Phoenix |
| 124 | + PHX_LATITUDE = 33.4484 |
| 125 | + PHX_LONGITUDE = -112.0740 |
| 126 | + |
| 127 | + inverse_distance_factors = 1.0 / ( |
| 128 | + (PHX_LATITUDE - df.LATITUDE) ** 2 + |
| 129 | + (PHX_LONGITUDE - df.LONGITUDE) ** 2 |
| 130 | + ) |
| 131 | + |
| 132 | + # Calculate each station's weight |
| 133 | + weights = inverse_distance_factors / inverse_distance_factors.sum() |
| 134 | + |
| 135 | + return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()}) |
| 136 | + |
136 | 137 | # Calculate the distance-weighted precipitation amount
|
137 | 138 | phx_annual_prcp_df = (
|
138 | 139 | annual_df.where((annual_df.ELEMENT == "PRCP"))
|
|
0 commit comments