Skip to content

Commit 0f79dfe

Browse files
dataflow: extensible template with javascript udf (#8291)
* dataflow: extensible template with javascript udf * address dandhlee's comments * address david's review comments * fix kwargs
1 parent ad02dcc commit 0f79dfe

File tree

6 files changed

+290
-11
lines changed

6 files changed

+290
-11
lines changed

dataflow/conftest.py

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212

1313
from dataclasses import dataclass
14+
from google.api_core.exceptions import NotFound
1415
import itertools
1516
import json
1617
import logging
@@ -91,23 +92,44 @@ def storage_bucket(name: str) -> str:
9192
logging.info(f"Deleted storage_bucket: {bucket.name}")
9293

9394
@staticmethod
94-
def bigquery_dataset(name: str, project: str = PROJECT) -> str:
95+
def bigquery_dataset(
96+
name: str,
97+
project: str = PROJECT,
98+
location: str = REGION,
99+
) -> str:
95100
from google.cloud import bigquery
96101

97102
bigquery_client = bigquery.Client()
98103

99104
dataset_name = Utils.underscore_name(name)
100-
dataset = bigquery_client.create_dataset(
101-
bigquery.Dataset(f"{project}.{dataset_name}")
102-
)
105+
dataset = bigquery.Dataset(f"{project}.{dataset_name}")
106+
dataset.location = location
107+
result = bigquery_client.create_dataset(dataset)
108+
109+
logging.info(f"Created bigquery_dataset: {result.full_dataset_id}")
110+
yield result.dataset_id
103111

104-
logging.info(f"Created bigquery_dataset: {dataset.full_dataset_id}")
105-
yield dataset_name
112+
try:
113+
bigquery_client.delete_dataset(
114+
f"{project}.{dataset_name}", delete_contents=True
115+
)
116+
logging.info(f"Deleted bigquery_dataset: {result.full_dataset_id}")
117+
except NotFound:
118+
logging.info(f"{result.full_dataset_id} already deleted.")
106119

107-
bigquery_client.delete_dataset(
108-
f"{project}.{dataset_name}", delete_contents=True
120+
@staticmethod
121+
def bigquery_table(
122+
dataset_name: str, table_name: str, project: str = PROJECT, **kwargs
123+
) -> str:
124+
from google.cloud import bigquery
125+
bigquery_client = bigquery.Client()
126+
table = bigquery.Table(
127+
f"{project}.{dataset_name}.{table_name}", **kwargs
109128
)
110-
logging.info(f"Deleted bigquery_dataset: {dataset.full_dataset_id}")
129+
result = bigquery_client.create_table(table)
130+
logging.info(f"Created bigquery_table: {result.full_table_id}")
131+
yield result.table_id
132+
# This table will be deleted when the dataset is deleted.
111133

112134
@staticmethod
113135
def bigquery_table_exists(
@@ -138,7 +160,7 @@ def pubsub_topic(name: str, project: str = PROJECT) -> str:
138160

139161
publisher_client = pubsub.PublisherClient()
140162
topic_path = publisher_client.topic_path(project, Utils.hyphen_name(name))
141-
topic = publisher_client.create_topic(topic_path)
163+
topic = publisher_client.create_topic(request={"name": topic_path})
142164

143165
logging.info(f"Created pubsub_topic: {topic.name}")
144166
yield topic.name
@@ -164,7 +186,9 @@ def pubsub_subscription(
164186
subscription_path = subscriber.subscription_path(
165187
project, Utils.hyphen_name(name)
166188
)
167-
subscription = subscriber.create_subscription(subscription_path, topic_path)
189+
subscription = subscriber.create_subscription(
190+
request={"name": subscription_path, "topic": topic_path}
191+
)
168192

169193
logging.info(f"Created pubsub_subscription: {subscription.name}")
170194
yield subscription.name
@@ -525,6 +549,46 @@ def dataflow_flex_template_run(
525549

526550
Utils.dataflow_jobs_cancel(job_id)
527551

552+
@staticmethod
553+
def dataflow_extensible_template_run(
554+
job_name: str,
555+
template_path: str,
556+
bucket_name: str,
557+
parameters: Dict[str, str] = {},
558+
project: str = PROJECT,
559+
region: str = REGION,
560+
) -> str:
561+
import yaml
562+
563+
unique_job_name = Utils.hyphen_name(job_name)
564+
logging.info(f"dataflow_job_name: {unique_job_name}")
565+
cmd = [
566+
"gcloud",
567+
"dataflow",
568+
"jobs",
569+
"run",
570+
unique_job_name,
571+
f"--gcs-location={template_path}",
572+
f"--project={project}",
573+
f"--region={region}",
574+
f"--staging-location=gs://{bucket_name}/staging",
575+
] + [
576+
f"--parameters={name}={value}"
577+
for name, value in {
578+
**parameters,
579+
}.items()
580+
]
581+
logging.info(cmd)
582+
583+
stdout = subprocess.check_output(cmd).decode("utf-8")
584+
logging.info(f"Launched Dataflow Template job: {unique_job_name}")
585+
job_id = yaml.safe_load(stdout)["id"]
586+
logging.info(f"Dataflow Template job id: {job_id}")
587+
logging.info(f">> {Utils.dataflow_job_url(job_id, project, region)}")
588+
yield job_id
589+
590+
Utils.dataflow_jobs_cancel(job_id)
591+
528592

529593
@pytest.fixture(scope="session")
530594
def utils() -> Utils:
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// [START dataflow_extensible_template_udf]
16+
/**
17+
* User-defined function (UDF) to transform events
18+
* as part of a Dataflow template job.
19+
*
20+
* @param {string} inJson input Pub/Sub JSON message (stringified)
21+
*/
22+
function process(inJson) {
23+
// Nashorn engine is only ECMAScript 5.1 (ES5) compliant. Newer ES6
24+
// JavaScript keywords like `let` or `const` will cause syntax errors.
25+
var obj = JSON.parse(inJson);
26+
var includePubsubMessage = obj.data && obj.attributes;
27+
var data = includePubsubMessage ? obj.data : obj;
28+
29+
if (!data.hasOwnProperty('url')) {
30+
throw new Error("No url found");
31+
} else if (data.url !== "https://beam.apache.org/") {
32+
throw new Error("Unrecognized url");
33+
}
34+
35+
return JSON.stringify(obj);
36+
}
37+
// [END dataflow_extensible_template_udf]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
13+
import json
14+
import os
15+
import time
16+
17+
from google.cloud import bigquery
18+
from google.cloud import pubsub
19+
from google.cloud import storage
20+
21+
try:
22+
# `conftest` cannot be imported when running in `nox`, but we still
23+
# try to import it for the autocomplete when writing the tests.
24+
from conftest import Utils
25+
except ModuleNotFoundError:
26+
Utils = None
27+
import pytest
28+
29+
PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]
30+
31+
NAME = "dataflow/extensible-template"
32+
33+
BQ_TABLE = "ratings"
34+
35+
36+
@pytest.fixture(scope="session")
37+
def bucket_name(utils: Utils) -> str:
38+
yield from utils.storage_bucket(NAME)
39+
40+
41+
@pytest.fixture(scope="session")
42+
def pubsub_topic(utils: Utils) -> str:
43+
yield from utils.pubsub_topic(NAME)
44+
45+
46+
@pytest.fixture(scope="session")
47+
def bq_dataset(utils: Utils) -> str:
48+
yield from utils.bigquery_dataset(NAME)
49+
50+
51+
@pytest.fixture(scope="session")
52+
def bq_table(utils: Utils, bq_dataset: str) -> str:
53+
yield from utils.bigquery_table(
54+
bq_dataset,
55+
BQ_TABLE,
56+
schema=[
57+
bigquery.SchemaField("url", "STRING", mode="REQUIRED"),
58+
bigquery.SchemaField("review", "STRING", mode="REQUIRED"),
59+
],
60+
)
61+
62+
63+
@pytest.fixture(scope="session")
64+
def dataflow_job_id(
65+
utils: Utils,
66+
bucket_name: str,
67+
pubsub_topic: str,
68+
bq_dataset: str,
69+
bq_table: str,
70+
) -> str:
71+
storage_client = storage.Client()
72+
bucket = storage_client.bucket(bucket_name)
73+
blob = bucket.blob("js/dataflow_udf_transform.js")
74+
blob.upload_from_filename(os.path.join(os.getcwd(), "dataflow_udf_transform.js"))
75+
output_table = f"{PROJECT}:{bq_dataset}.{bq_table}"
76+
yield from utils.dataflow_extensible_template_run(
77+
job_name=NAME,
78+
template_path="gs://dataflow-templates/latest/PubSub_to_BigQuery",
79+
bucket_name=bucket_name,
80+
parameters={
81+
"inputTopic": pubsub_topic,
82+
"outputTableSpec": output_table,
83+
"javascriptTextTransformGcsPath": f"gs://{bucket_name}/js/dataflow_udf_transform.js",
84+
"javascriptTextTransformFunctionName": "process",
85+
},
86+
)
87+
88+
89+
def test_extensible_template(
90+
utils: Utils,
91+
pubsub_topic: str,
92+
dataflow_job_id: str,
93+
bq_dataset: str,
94+
bq_table: str,
95+
) -> None:
96+
publisher_client = pubsub.PublisherClient()
97+
for i in range(30):
98+
good_msg = json.dumps(
99+
{
100+
"url": "https://beam.apache.org/",
101+
"review": "positive" if i % 2 == 0 else "negative",
102+
}
103+
)
104+
publisher_client.publish(pubsub_topic, good_msg.encode("utf-8"))
105+
bad_msg = json.dumps(
106+
{
107+
"url": "https://kafka.apache.org/",
108+
"review": "positive" if i % 2 == 0 else "negative",
109+
}
110+
)
111+
publisher_client.publish(pubsub_topic, bad_msg.encode("utf-8"))
112+
time.sleep(10)
113+
114+
# Wait until the dataflow job starts running successfully.
115+
# The job is cancelled as part of the teardown to avoid leaking resource.
116+
utils.dataflow_jobs_wait(
117+
dataflow_job_id,
118+
target_states={"JOB_STATE_RUNNING"},
119+
timeout_sec=300,
120+
poll_interval_sec=30,
121+
)
122+
123+
query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}`"
124+
good_records = utils.bigquery_query(query)
125+
assert len(list(good_records)) > 0
126+
127+
query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}_error_records`"
128+
bad_records = utils.bigquery_query(query)
129+
assert len(list(bad_records)) > 0
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Default TEST_CONFIG_OVERRIDE for python repos.
16+
17+
# You can copy this file into your directory, then it will be inported from
18+
# the noxfile.py.
19+
20+
# The source of truth:
21+
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py
22+
23+
TEST_CONFIG_OVERRIDE = {
24+
# You can opt out from the test for specific Python versions.
25+
"ignored_versions": ["2.7", "3.6", "3.7", "3.8", "3.10"],
26+
# Old samples are opted out of enforcing Python type hints
27+
# All new samples should feature them
28+
"enforce_type_hints": True,
29+
# An envvar key for determining the project id to use. Change it
30+
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
31+
# build specific Cloud project. You can also use your own string
32+
# to use your own Cloud project.
33+
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
34+
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
35+
# If you need to use a specific version of pip,
36+
# change pip_version_override to the string representation
37+
# of the version number, for example, "20.2.4"
38+
"pip_version_override": None,
39+
# A dictionary you want to inject into your test. Don't put any
40+
# secrets here. These values will override predefined values.
41+
# "envs": {},
42+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
google-api-python-client==2.47.0
2+
google-cloud-bigquery==3.3.2
3+
google-cloud-storage==2.1.0
4+
pytest-xdist==2.5.0
5+
pytest==7.0.1
6+
pyyaml==6.0
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
apache-beam[gcp]==2.41.0

0 commit comments

Comments
 (0)