31
31
# GCP Project
32
32
PROJECT_ID = os .environ ["GOOGLE_CLOUD_PROJECT" ]
33
33
TEST_ID = uuid .uuid4 ()
34
+ DATAPROC_REGION = "us-central1"
35
+
34
36
35
37
# Google Cloud Storage constants
36
38
BUCKET_NAME = f"data-analytics-process-test{ TEST_ID } "
44
46
BQ_WRITE_TABLE = f"data-analytics-process-test-normalized-{ TEST_ID } " .replace ("-" , "_" )
45
47
TABLE_ID = f"{ PROJECT_ID } .{ BQ_DATASET } .{ BQ_READ_TABLE } "
46
48
47
- DATAPROC_REGION = "us-central1"
48
49
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
49
50
PROCESSING_PYTHON_FILE = f"gs://{ BUCKET_NAME } /{ BUCKET_BLOB } "
50
51
51
- BATCH_ID = (
52
- f"summit-dag-test-{ TEST_ID } " # Dataproc serverless only allows lowercase characters
53
- )
54
- BATCH_CONFIG = {
55
- "pyspark_batch" : {
56
- "jar_file_uris" : [PYSPARK_JAR ],
57
- "main_python_file_uri" : PROCESSING_PYTHON_FILE ,
58
- "args" : [
59
- PROJECT_ID ,
60
- f"{ BQ_DATASET } .{ BQ_READ_TABLE } " ,
61
- f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " ,
62
- ],
63
- },
64
- }
52
+
53
+ @pytest .fixture (scope = "module" )
54
+ def test_dataproc_batch ():
55
+
56
+ BATCH_ID = (
57
+ f"summit-dag-test-{ TEST_ID } " # Dataproc serverless only allows lowercase characters
58
+ )
59
+ BATCH_CONFIG = {
60
+ "pyspark_batch" : {
61
+ "jar_file_uris" : [PYSPARK_JAR ],
62
+ "main_python_file_uri" : PROCESSING_PYTHON_FILE ,
63
+ "args" : [
64
+ PROJECT_ID ,
65
+ f"{ BQ_DATASET } .{ BQ_READ_TABLE } " ,
66
+ f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " ,
67
+ ],
68
+ },
69
+ }
70
+
71
+ yield (BATCH_ID , BATCH_CONFIG )
72
+ dataproc_client = dataproc .BatchControllerClient (
73
+ client_options = {
74
+ "api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
75
+ }
76
+ )
77
+ request = dataproc .DeleteBatchRequest (
78
+ name = f"projects/{ PROJECT_ID } /locations/{ DATAPROC_REGION } /batches/{ BATCH_ID } "
79
+ )
80
+
81
+ # Make the request
82
+ response = dataproc_client .delete_batch (request = request )
83
+
84
+ # There will only be a response if the deletion fails
85
+ # otherwise response will be None
86
+ if response :
87
+ print (response )
65
88
66
89
67
90
@pytest .fixture (scope = "module" )
@@ -85,7 +108,6 @@ def test_bucket():
85
108
bucket .delete (force = True )
86
109
87
110
88
- # TODO(coleleah): teardown any previous resources
89
111
@pytest .fixture (autouse = True )
90
112
def bq_dataset (test_bucket ):
91
113
# Create dataset and table tfor test CSV
@@ -125,7 +147,7 @@ def bq_dataset(test_bucket):
125
147
126
148
# Retry if we see a flaky 409 "subnet not ready" exception
127
149
@backoff .on_exception (backoff .expo , Aborted , max_tries = 3 )
128
- def test_process (test_bucket ):
150
+ def test_process (test_dataproc_batch ):
129
151
# check that the results table isnt there
130
152
with pytest .raises (NotFound ):
131
153
BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " )
@@ -138,8 +160,8 @@ def test_process(test_bucket):
138
160
)
139
161
request = dataproc .CreateBatchRequest (
140
162
parent = f"projects/{ PROJECT_ID } /regions/{ DATAPROC_REGION } " ,
141
- batch = BATCH_CONFIG ,
142
- batch_id = BATCH_ID ,
163
+ batch = test_dataproc_batch [ 1 ] ,
164
+ batch_id = test_dataproc_batch [ 0 ] ,
143
165
)
144
166
# Make the request
145
167
operation = dataproc_client .create_batch (request = request )
0 commit comments