Skip to content

Commit dba937c

Browse files
committed
refactor test to match #8328
1 parent 846570f commit dba937c

File tree

1 file changed

+60
-40
lines changed

1 file changed

+60
-40
lines changed

composer/2022_airflow_summit/data_analytics_process_expansion_test.py

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import os
2121
import uuid
2222

23-
import backoff
2423
from google.api_core.exceptions import Aborted, NotFound
2524
from google.cloud import bigquery
2625
from google.cloud import dataproc_v1 as dataproc
@@ -58,8 +57,22 @@
5857
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/{BUCKET_BLOB}"
5958

6059

61-
@pytest.fixture(scope="module")
62-
def test_dataproc_batch():
60+
@pytest.fixture(scope="function")
61+
def test_dataproc_batch(test_bucket, bq_dataset):
62+
# check that the results tables aren't there
63+
# considered using pytest parametrize, but did not want rest of test
64+
# to run 5 times - only this part
65+
output_tables = [
66+
BQ_NORMALIZED_TABLE_NAME,
67+
BQ_PRCP_MEAN_TABLE_NAME,
68+
BQ_SNOW_MEAN_TABLE_NAME,
69+
BQ_PHX_PRCP_TABLE_NAME,
70+
BQ_PHX_SNOW_TABLE_NAME,
71+
]
72+
for output_table in output_tables:
73+
with pytest.raises(NotFound):
74+
BQ_CLIENT.get_table(f"{BQ_DESTINATION_DATASET_NAME}.{output_table}")
75+
6376

6477
BATCH_ID = f"summit-dag-expansion-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
6578
BATCH_CONFIG = {
@@ -78,7 +91,47 @@ def test_dataproc_batch():
7891
}
7992
}
8093

81-
yield (BATCH_ID, BATCH_CONFIG)
94+
# create a batch
95+
dataproc_client = dataproc.BatchControllerClient(
96+
client_options={
97+
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
98+
}
99+
)
100+
request = dataproc.CreateBatchRequest(
101+
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
102+
batch=test_dataproc_batch[1],
103+
batch_id=test_dataproc_batch[0],
104+
)
105+
try:
106+
# Make the request
107+
operation = dataproc_client.create_batch(request=request)
108+
109+
print("Waiting for operation to complete...")
110+
111+
response = operation.result()
112+
except Aborted as e:
113+
# retry once if we see a flaky 409 "subnet not ready error"
114+
if "/subnetworks/default" in e:
115+
# delete the errored out batch so we don't see an "AlreadyExists"
116+
delete_request = dataproc.DeleteBatchRequest(
117+
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
118+
)
119+
dataproc_client.delete_batch(request=delete_request)
120+
# retry the creation operation once
121+
create_request = dataproc.CreateBatchRequest(
122+
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
123+
batch=BATCH_CONFIG,
124+
batch_id=BATCH_ID,
125+
)
126+
operation = dataproc_client.create_batch(request=create_request)
127+
128+
print("Waiting for operation to complete...")
129+
130+
response = operation.result()
131+
else:
132+
raise (e)
133+
134+
yield response
82135
dataproc_client = dataproc.BatchControllerClient(
83136
client_options={
84137
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
@@ -162,43 +215,10 @@ def bq_dataset(test_bucket):
162215
print(f"Ignoring NotFound on cleanup, details: {e}")
163216

164217

165-
# Retry if we see a flaky 409 "subnet not ready" exception
166-
@backoff.on_exception(backoff.expo, Aborted, max_tries=3)
167-
def test_process(test_dataproc_batch):
168-
# check that the results tables aren't there
169-
# considered using pytest parametrize, but did not want rest of test
170-
# to run 5 times - only this part
171-
output_tables = [
172-
BQ_NORMALIZED_TABLE_NAME,
173-
BQ_PRCP_MEAN_TABLE_NAME,
174-
BQ_SNOW_MEAN_TABLE_NAME,
175-
BQ_PHX_PRCP_TABLE_NAME,
176-
BQ_PHX_SNOW_TABLE_NAME,
177-
]
178-
for output_table in output_tables:
179-
with pytest.raises(NotFound):
180-
BQ_CLIENT.get_table(f"{BQ_DESTINATION_DATASET_NAME}.{output_table}")
181-
182-
# create a batch
183-
dataproc_client = dataproc.BatchControllerClient(
184-
client_options={
185-
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
186-
}
187-
)
188-
request = dataproc.CreateBatchRequest(
189-
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
190-
batch=test_dataproc_batch[1],
191-
batch_id=test_dataproc_batch[0],
192-
)
193-
# Make the request
194-
operation = dataproc_client.create_batch(request=request)
195218

196-
print("Waiting for operation to complete...")
197-
198-
response = operation.result()
199-
200-
# Handle the response
201-
print(response)
219+
def test_process(test_dataproc_batch):
220+
print(test_dataproc_batch)
221+
202222

203223
# check that the results table is there now
204224
assert (

0 commit comments

Comments
 (0)