Skip to content

Commit 1ce4787

Browse files
committed
update most write methods, fix test issue with comparing to exception
1 parent dba937c commit 1ce4787

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

composer/2022_airflow_summit/data_analytics_process_expansion.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,14 @@ def phx_dw_compute(input_list) -> float:
197197
# for other save mode options
198198
(
199199
df.write.format("bigquery")
200-
.option("temporaryGcsBucket", temp_path)
200+
.option("writeMethod", "direct")
201201
.mode("overwrite")
202202
.save(DF_WRITE_TABLE)
203203
)
204204

205205
(
206206
prcp_mean_df.write.format("bigquery")
207-
.option("temporaryGcsBucket", temp_path)
207+
.option("writeMethod", "direct")
208208
.mode("overwrite")
209209
.save(PRCP_MEAN_WRITE_TABLE)
210210
)
@@ -218,14 +218,14 @@ def phx_dw_compute(input_list) -> float:
218218

219219
(
220220
phx_annual_prcp_df.write.format("bigquery")
221-
.option("temporaryGcsBucket", temp_path)
221+
.option("writeMethod", "direct")
222222
.mode("overwrite")
223223
.save(PHX_PRCP_WRITE_TABLE)
224224
)
225225

226226
(
227227
phx_annual_snow_df.write.format("bigquery")
228-
.option("temporaryGcsBucket", temp_path)
228+
.option("writeMethod", "direct")
229229
.mode("overwrite")
230230
.save(PHX_SNOW_WRITE_TABLE)
231231
)

composer/2022_airflow_summit/data_analytics_process_expansion_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp"
5454
BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow"
5555

56-
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
56+
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
5757
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/{BUCKET_BLOB}"
5858

5959

@@ -99,8 +99,8 @@ def test_dataproc_batch(test_bucket, bq_dataset):
9999
)
100100
request = dataproc.CreateBatchRequest(
101101
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
102-
batch=test_dataproc_batch[1],
103-
batch_id=test_dataproc_batch[0],
102+
batch=BATCH_CONFIG,
103+
batch_id=BATCH_ID,
104104
)
105105
try:
106106
# Make the request
@@ -111,7 +111,7 @@ def test_dataproc_batch(test_bucket, bq_dataset):
111111
response = operation.result()
112112
except Aborted as e:
113113
# retry once if we see a flaky 409 "subnet not ready error"
114-
if "/subnetworks/default" in e:
114+
if "/subnetworks/default" in str(e):
115115
# delete the errored out batch so we don't see an "AlreadyExists"
116116
delete_request = dataproc.DeleteBatchRequest(
117117
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"

composer/2022_airflow_summit/data_analytics_process_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def test_dataproc_batch(test_bucket, bq_dataset):
8888
response = operation.result()
8989
except Aborted as e:
9090
# retry once if we see a flaky 409 "subnet not ready error"
91-
if "/subnetworks/default" in e:
91+
if "/subnetworks/default" in str(e):
9292
# delete the errored out batch so we don't see an "AlreadyExists"
9393
delete_request = dataproc.DeleteBatchRequest(
9494
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"

0 commit comments

Comments
 (0)