Skip to content

Commit ff545c2

Browse files
leahecoleparthea
andauthored
remove backoff, add manual retry (#8328)
* remove backoff, add manual retry * fix lint * remove unused import Co-authored-by: Anthonios Partheniou <[email protected]>
1 parent 2671b97 commit ff545c2

File tree

1 file changed

+46
-34
lines changed

1 file changed

+46
-34
lines changed

composer/2022_airflow_summit/data_analytics_process_test.py

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import os
2121
import uuid
2222

23-
import backoff
2423
from google.api_core.exceptions import Aborted, NotFound
2524
from google.cloud import bigquery
2625
from google.cloud import dataproc_v1 as dataproc
@@ -50,12 +49,13 @@
5049
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/{BUCKET_BLOB}"
5150

5251

53-
@pytest.fixture(scope="module")
54-
def test_dataproc_batch():
52+
@pytest.fixture(scope="function")
53+
def test_dataproc_batch(test_bucket, bq_dataset):
54+
# check that the results table isnt there
55+
with pytest.raises(NotFound):
56+
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")
5557

56-
BATCH_ID = (
57-
f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
58-
)
58+
BATCH_ID = f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
5959
BATCH_CONFIG = {
6060
"pyspark_batch": {
6161
"jar_file_uris": [PYSPARK_JAR],
@@ -68,12 +68,48 @@ def test_dataproc_batch():
6868
},
6969
}
7070

71-
yield (BATCH_ID, BATCH_CONFIG)
71+
# create a batch
7272
dataproc_client = dataproc.BatchControllerClient(
7373
client_options={
7474
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
7575
}
7676
)
77+
request = dataproc.CreateBatchRequest(
78+
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
79+
batch=BATCH_CONFIG,
80+
batch_id=BATCH_ID,
81+
)
82+
try:
83+
# Make the request
84+
operation = dataproc_client.create_batch(request=request)
85+
86+
print("Waiting for operation to complete...")
87+
88+
response = operation.result()
89+
except Aborted as e:
90+
# retry once if we see a flaky 409 "subnet not ready error"
91+
if "/subnetworks/default" in e:
92+
# delete the errored out batch so we don't see an "AlreadyExists"
93+
delete_request = dataproc.DeleteBatchRequest(
94+
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
95+
)
96+
dataproc_client.delete_batch(request=delete_request)
97+
# retry the creation operation once
98+
create_request = dataproc.CreateBatchRequest(
99+
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
100+
batch=BATCH_CONFIG,
101+
batch_id=BATCH_ID,
102+
)
103+
operation = dataproc_client.create_batch(request=create_request)
104+
105+
print("Waiting for operation to complete...")
106+
107+
response = operation.result()
108+
else:
109+
raise (e)
110+
111+
yield response
112+
77113
request = dataproc.DeleteBatchRequest(
78114
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
79115
)
@@ -108,7 +144,7 @@ def test_bucket():
108144
bucket.delete(force=True)
109145

110146

111-
@pytest.fixture(autouse=True)
147+
@pytest.fixture(scope="module")
112148
def bq_dataset(test_bucket):
113149
# Create dataset and table tfor test CSV
114150
BQ_CLIENT.create_dataset(BQ_DATASET)
@@ -145,33 +181,9 @@ def bq_dataset(test_bucket):
145181
print(f"Ignoring NotFound on cleanup, details: {e}")
146182

147183

148-
# Retry if we see a flaky 409 "subnet not ready" exception
149-
@backoff.on_exception(backoff.expo, Aborted, max_tries=3)
150184
def test_process(test_dataproc_batch):
151-
# check that the results table isnt there
152-
with pytest.raises(NotFound):
153-
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")
154-
155-
# create a batch
156-
dataproc_client = dataproc.BatchControllerClient(
157-
client_options={
158-
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
159-
}
160-
)
161-
request = dataproc.CreateBatchRequest(
162-
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
163-
batch=test_dataproc_batch[1],
164-
batch_id=test_dataproc_batch[0],
165-
)
166-
# Make the request
167-
operation = dataproc_client.create_batch(request=request)
168-
169-
print("Waiting for operation to complete...")
170-
171-
response = operation.result()
172185

173-
# Handle the response
174-
print(response)
186+
print(test_dataproc_batch)
175187

176188
# check that the results table is there now
177-
assert BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}").num_rows > 0
189+
assert BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}").num_rows > 0

0 commit comments

Comments
 (0)