Skip to content

remove backoff, add manual retry #8328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 20, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 46 additions & 34 deletions composer/2022_airflow_summit/data_analytics_process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import os
import uuid

import backoff
from google.api_core.exceptions import Aborted, NotFound
from google.cloud import bigquery
from google.cloud import dataproc_v1 as dataproc
Expand Down Expand Up @@ -50,12 +49,13 @@
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/{BUCKET_BLOB}"


@pytest.fixture(scope="module")
def test_dataproc_batch():
@pytest.fixture(scope="function")
def test_dataproc_batch(test_bucket, bq_dataset):
# check that the results table isnt there
with pytest.raises(NotFound):
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")

BATCH_ID = (
f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
)
BATCH_ID = f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
"pyspark_batch": {
"jar_file_uris": [PYSPARK_JAR],
Expand All @@ -68,12 +68,48 @@ def test_dataproc_batch():
},
}

yield (BATCH_ID, BATCH_CONFIG)
# create a batch
dataproc_client = dataproc.BatchControllerClient(
client_options={
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
}
)
request = dataproc.CreateBatchRequest(
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
)
try:
# Make the request
operation = dataproc_client.create_batch(request=request)

print("Waiting for operation to complete...")

response = operation.result()
except Aborted as e:
# retry once if we see a flaky 409 "subnet not ready error"
if "/subnetworks/default" in e:
# delete the errored out batch so we don't see an "AlreadyExists"
delete_request = dataproc.DeleteBatchRequest(
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
)
dataproc_client.delete_batch(request=delete_request)
# retry the creation operation once
create_request = dataproc.CreateBatchRequest(
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
)
operation = dataproc_client.create_batch(request=create_request)

print("Waiting for operation to complete...")

response = operation.result()
else:
raise (e)

yield response

request = dataproc.DeleteBatchRequest(
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
)
Expand Down Expand Up @@ -108,7 +144,7 @@ def test_bucket():
bucket.delete(force=True)


@pytest.fixture(autouse=True)
@pytest.fixture(scope="module")
def bq_dataset(test_bucket):
# Create dataset and table tfor test CSV
BQ_CLIENT.create_dataset(BQ_DATASET)
Expand Down Expand Up @@ -145,33 +181,9 @@ def bq_dataset(test_bucket):
print(f"Ignoring NotFound on cleanup, details: {e}")


# Retry if we see a flaky 409 "subnet not ready" exception
@backoff.on_exception(backoff.expo, Aborted, max_tries=3)
def test_process(test_dataproc_batch):
# check that the results table isnt there
with pytest.raises(NotFound):
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")

# create a batch
dataproc_client = dataproc.BatchControllerClient(
client_options={
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
}
)
request = dataproc.CreateBatchRequest(
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
batch=test_dataproc_batch[1],
batch_id=test_dataproc_batch[0],
)
# Make the request
operation = dataproc_client.create_batch(request=request)

print("Waiting for operation to complete...")

response = operation.result()

# Handle the response
print(response)
print(test_dataproc_batch)

# check that the results table is there now
assert BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}").num_rows > 0
assert BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}").num_rows > 0