Skip to content

Commit 0af547f

Browse files
leahecoleparthea
andcommitted
remove backoff, add manual retry (#8328)
* remove backoff, add manual retry * fix lint * remove unused import Co-authored-by: Anthonios Partheniou <[email protected]>
1 parent 7a8b0c0 commit 0af547f

File tree

1 file changed

+46
-34
lines changed

1 file changed

+46
-34
lines changed

composer/2022_airflow_summit/data_analytics_process_test.py

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import os
2121
import uuid
2222

23-
import backoff
2423
from google.api_core.exceptions import Aborted, NotFound
2524
from google.cloud import bigquery
2625
from google.cloud import dataproc_v1 as dataproc
@@ -50,12 +49,13 @@
5049
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/{BUCKET_BLOB}"
5150

5251

53-
@pytest.fixture(scope="module")
54-
def test_dataproc_batch():
52+
@pytest.fixture(scope="function")
53+
def test_dataproc_batch(test_bucket, bq_dataset):
54+
# check that the results table isnt there
55+
with pytest.raises(NotFound):
56+
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")
5557

56-
BATCH_ID = (
57-
f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
58-
)
58+
BATCH_ID = f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
5959
BATCH_CONFIG = {
6060
"pyspark_batch": {
6161
"jar_file_uris": [PYSPARK_JAR],
@@ -68,12 +68,48 @@ def test_dataproc_batch():
6868
},
6969
}
7070

71-
yield (BATCH_ID, BATCH_CONFIG)
71+
# create a batch
7272
dataproc_client = dataproc.BatchControllerClient(
7373
client_options={
7474
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
7575
}
7676
)
77+
request = dataproc.CreateBatchRequest(
78+
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
79+
batch=BATCH_CONFIG,
80+
batch_id=BATCH_ID,
81+
)
82+
try:
83+
# Make the request
84+
operation = dataproc_client.create_batch(request=request)
85+
86+
print("Waiting for operation to complete...")
87+
88+
response = operation.result()
89+
except Aborted as e:
90+
# retry once if we see a flaky 409 "subnet not ready error"
91+
if "/subnetworks/default" in e:
92+
# delete the errored out batch so we don't see an "AlreadyExists"
93+
delete_request = dataproc.DeleteBatchRequest(
94+
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
95+
)
96+
dataproc_client.delete_batch(request=delete_request)
97+
# retry the creation operation once
98+
create_request = dataproc.CreateBatchRequest(
99+
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
100+
batch=BATCH_CONFIG,
101+
batch_id=BATCH_ID,
102+
)
103+
operation = dataproc_client.create_batch(request=create_request)
104+
105+
print("Waiting for operation to complete...")
106+
107+
response = operation.result()
108+
else:
109+
raise (e)
110+
111+
yield response
112+
77113
request = dataproc.DeleteBatchRequest(
78114
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
79115
)
@@ -110,7 +146,7 @@ def test_bucket():
110146
bucket.delete(force=True)
111147

112148

113-
@pytest.fixture(autouse=True)
149+
@pytest.fixture(scope="module")
114150
def bq_dataset(test_bucket):
115151
# Create dataset and table tfor test CSV
116152
BQ_CLIENT.create_dataset(BQ_DATASET)
@@ -147,33 +183,9 @@ def bq_dataset(test_bucket):
147183
print(f"Ignoring NotFound on cleanup, details: {e}")
148184

149185

150-
# Retry if we see a flaky 409 "subnet not ready" exception
151-
@backoff.on_exception(backoff.expo, Aborted, max_tries=3)
152186
def test_process(test_dataproc_batch):
153-
# check that the results table isnt there
154-
with pytest.raises(NotFound):
155-
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")
156-
157-
# create a batch
158-
dataproc_client = dataproc.BatchControllerClient(
159-
client_options={
160-
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
161-
}
162-
)
163-
request = dataproc.CreateBatchRequest(
164-
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
165-
batch=test_dataproc_batch[1],
166-
batch_id=test_dataproc_batch[0],
167-
)
168-
# Make the request
169-
operation = dataproc_client.create_batch(request=request)
170-
171-
print("Waiting for operation to complete...")
172-
173-
response = operation.result()
174187

175-
# Handle the response
176-
print(response)
188+
print(test_dataproc_batch)
177189

178190
# check that the results table is there now
179-
assert BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}").num_rows > 0
191+
assert BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}").num_rows > 0

0 commit comments

Comments
 (0)