21
21
import uuid
22
22
23
23
import backoff
24
- from google .api_core .exceptions import Aborted , NotFound , AlreadyExists
24
+ from google .api_core .exceptions import Aborted , NotFound
25
25
from google .cloud import bigquery
26
26
from google .cloud import dataproc_v1 as dataproc
27
27
from google .cloud import storage
50
50
PROCESSING_PYTHON_FILE = f"gs://{ BUCKET_NAME } /{ BUCKET_BLOB } "
51
51
52
52
53
- # Retry if we see a flaky 409 "subnet not ready" exception
54
- @backoff .on_exception (backoff .expo , Aborted , max_tries = 3 )
55
- @pytest .fixture (scope = "function" )
56
- def test_dataproc_batch (test_bucket , bq_dataset ):
57
- # check that the results table isnt there
58
- with pytest .raises (NotFound ):
59
- BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " )
53
+ @pytest .fixture (scope = "module" )
54
+ def test_dataproc_batch ():
60
55
61
- BATCH_ID = f"summit-dag-test-{ TEST_ID } " # Dataproc serverless only allows lowercase characters
56
+ BATCH_ID = (
57
+ f"summit-dag-test-{ TEST_ID } " # Dataproc serverless only allows lowercase characters
58
+ )
62
59
BATCH_CONFIG = {
63
60
"pyspark_batch" : {
64
61
"jar_file_uris" : [PYSPARK_JAR ],
@@ -71,27 +68,7 @@ def test_dataproc_batch(test_bucket, bq_dataset):
71
68
},
72
69
}
73
70
74
- # create a batch
75
- dataproc_client = dataproc .BatchControllerClient (
76
- client_options = {
77
- "api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
78
- }
79
- )
80
- request = dataproc .CreateBatchRequest (
81
- parent = f"projects/{ PROJECT_ID } /regions/{ DATAPROC_REGION } " ,
82
- batch = BATCH_CONFIG ,
83
- batch_id = BATCH_ID ,
84
- )
85
-
86
- # Make the request
87
- operation = dataproc_client .create_batch (request = request )
88
-
89
- print ("Waiting for operation to complete..." )
90
-
91
- response = operation .result ()
92
-
93
- yield response
94
-
71
+ yield (BATCH_ID , BATCH_CONFIG )
95
72
dataproc_client = dataproc .BatchControllerClient (
96
73
client_options = {
97
74
"api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
@@ -133,7 +110,7 @@ def test_bucket():
133
110
bucket .delete (force = True )
134
111
135
112
136
- @pytest .fixture (scope = "module" )
113
+ @pytest .fixture (autouse = True )
137
114
def bq_dataset (test_bucket ):
138
115
# Create dataset and table tfor test CSV
139
116
BQ_CLIENT .create_dataset (BQ_DATASET )
@@ -170,9 +147,33 @@ def bq_dataset(test_bucket):
170
147
print (f"Ignoring NotFound on cleanup, details: { e } " )
171
148
172
149
150
+ # Retry if we see a flaky 409 "subnet not ready" exception
151
+ @backoff .on_exception (backoff .expo , Aborted , max_tries = 3 )
173
152
def test_process (test_dataproc_batch ):
153
+ # check that the results table isnt there
154
+ with pytest .raises (NotFound ):
155
+ BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " )
156
+
157
+ # create a batch
158
+ dataproc_client = dataproc .BatchControllerClient (
159
+ client_options = {
160
+ "api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
161
+ }
162
+ )
163
+ request = dataproc .CreateBatchRequest (
164
+ parent = f"projects/{ PROJECT_ID } /regions/{ DATAPROC_REGION } " ,
165
+ batch = test_dataproc_batch [1 ],
166
+ batch_id = test_dataproc_batch [0 ],
167
+ )
168
+ # Make the request
169
+ operation = dataproc_client .create_batch (request = request )
170
+
171
+ print ("Waiting for operation to complete..." )
172
+
173
+ response = operation .result ()
174
174
175
- print (test_dataproc_batch )
175
+ # Handle the response
176
+ print (response )
176
177
177
178
# check that the results table is there now
178
179
assert BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " ).num_rows > 0
0 commit comments