Skip to content

Commit 6e6b7e2

Browse files
sofislleahecoledavidcavazos
authored
feat: add dataflow-composer tutorial code samples (#3290)
* feat: add dataflow operator sample and test * feat: add composer tutorial samples and tests * fix: remove files * fix: respond to comments * fix: respond to comments * fix: run lint * fix import error * feat: add dataflow template operator tutorial samples * fix: commented out variables * fix errors * fix: trailing whitespace * fix: errant underscore * fix: shorten UUID since it is too long * fix: assertion * fix: uppercase to lowercase f * fix: create table format * fix: imports out of order * fix: import functions from helper file to test * fix: run lint * fix imports * fix: failing test * fix: tests to have specific uuid * fix: run linting, clean up code * fix: variables and headers * fix: linting, comments, variable names * fix: add command-line option for Python sample * feat: add comments for reader * Remove argparser * correct errors in dag * fix import order * address lint * fix: typo in parameters * feat: fix typos in tutorial * fix: change typle result * fix: change variable name * add dataset_name variable * fix: change template to batch vs. streaming * fix: linting Co-authored-by: Leah E. Cole <[email protected]> Co-authored-by: Leah Cole <[email protected]> Co-authored-by: David Cavazos <[email protected]>
1 parent ae45478 commit 6e6b7e2

4 files changed

+255
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This script is a helper function to the Dataflow Template Operator Tutorial.
16+
# It helps the user set up a BigQuery dataset and table that is needed
17+
# for the tutorial.
18+
19+
# [START composer_dataflow_dataset_table_creation]
20+
21+
# Make sure to follow the quickstart setup instructions beforehand.
22+
# See instructions here:
23+
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries
24+
25+
# Before running the sample, be sure to install the bigquery library
26+
# in your local environment by running pip install google.cloud.bigquery
27+
28+
from google.cloud import bigquery
29+
30+
# TODO(developer): Replace with your values
31+
project = 'your-project' # Your GCP Project
32+
location = 'US' # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
33+
dataset_name = 'average_weather'
34+
35+
36+
def create_dataset_and_table(project, location, dataset_name):
37+
# Construct a BigQuery client object.
38+
client = bigquery.Client(project)
39+
40+
dataset_id = f"{project}.{dataset_name}"
41+
42+
# Construct a full Dataset object to send to the API.
43+
dataset = bigquery.Dataset(dataset_id)
44+
45+
# Set the location to your desired location for the dataset.
46+
# For more information, see this link:
47+
# https://cloud.google.com/bigquery/docs/locations
48+
dataset.location = location
49+
50+
# Send the dataset to the API for creation.
51+
# Raises google.api_core.exceptions.Conflict if the Dataset already
52+
# exists within the project.
53+
dataset = client.create_dataset(dataset) # Make an API request.
54+
55+
print(f"Created dataset {client.project}.{dataset.dataset_id}")
56+
57+
# Create a table from this dataset.
58+
59+
table_id = f"{client.project}.{dataset_name}.average_weather"
60+
61+
schema = [
62+
bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
63+
bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
64+
bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
65+
bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
66+
bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
67+
bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
68+
]
69+
70+
table = bigquery.Table(table_id, schema=schema)
71+
table = client.create_table(table) # Make an API request.
72+
print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
73+
74+
# [END composer_dataflow_dataset_table_creation]
75+
return dataset, table
76+
77+
78+
if __name__ == "__main__":
79+
create_dataset_and_table(project, location, "average_weather")
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This script is a helper function to the Dataflow Template Operator Tutorial.
16+
# It helps the user set up a BigQuery dataset and table that is needed
17+
# for the tutorial.
18+
19+
import os
20+
import uuid
21+
22+
from google.cloud import bigquery
23+
24+
from . import dataflowtemplateoperator_create_dataset_and_table_helper as helper
25+
26+
PROJECT_ID = os.environ["GCLOUD_PROJECT"]
27+
28+
client = bigquery.Client()
29+
30+
dataset_UUID = str(uuid.uuid4()).split("-")[0]
31+
32+
expected_schema = [
33+
bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
34+
bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
35+
bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
36+
bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
37+
bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
38+
bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
39+
]
40+
41+
42+
def test_creation():
43+
try:
44+
dataset, table = helper.create_dataset_and_table(PROJECT_ID, "US", dataset_UUID)
45+
46+
assert table.table_id == "average_weather"
47+
assert dataset.dataset_id == dataset_UUID
48+
assert table.schema == expected_schema
49+
50+
finally:
51+
52+
client.delete_dataset(dataset, delete_contents=True, not_found_ok=True)
53+
client.delete_table(table, not_found_ok=True)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START composer_dataflow_dag]
16+
17+
18+
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
19+
wordcount example, and deletes the cluster.
20+
21+
This DAG relies on three Airflow variables
22+
https://airflow.apache.org/concepts.html#variables
23+
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
24+
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
25+
created.
26+
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
27+
created.
28+
Learn more about the difference between the two here:
29+
https://cloud.google.com/compute/docs/regions-zones
30+
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
31+
Function (.js), the input file (.txt), and the JSON schema (.json).
32+
"""
33+
34+
import datetime
35+
36+
from airflow import models
37+
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
38+
from airflow.utils.dates import days_ago
39+
40+
bucket_path = models.Variable.get("bucket_path")
41+
project_id = models.Variable.get("project_id")
42+
gce_zone = models.Variable.get("gce_zone")
43+
gce_region = models.Variable.get("gce_region")
44+
45+
46+
default_args = {
47+
# Tell airflow to start one day ago, so that it runs as soon as you upload it
48+
"start_date": days_ago(1),
49+
"dataflow_default_options": {
50+
"project": project_id,
51+
# Set to your region
52+
"region": gce_region,
53+
# Set to your zone
54+
"zone": gce_zone,
55+
# This is a subfolder for storing temporary files, like the staged pipeline job.
56+
"temp_location": bucket_path + "/tmp/",
57+
},
58+
}
59+
60+
# Define a DAG (directed acyclic graph) of tasks.
61+
# Any task you create within the context manager is automatically added to the
62+
# DAG object.
63+
with models.DAG(
64+
# The id you will see in the DAG airflow page
65+
"composer_dataflow_dag",
66+
default_args=default_args,
67+
# The interval with which to schedule the DAG
68+
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
69+
) as dag:
70+
71+
start_template_job = DataflowTemplateOperator(
72+
# The task id of your job
73+
task_id="dataflow_operator_transform_csv_to_bq",
74+
# The name of the template that you're using.
75+
# Below is a list of all the templates you can use.
76+
# For versions in non-production environments, use the subfolder 'latest'
77+
# https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
78+
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
79+
# Use the link above to specify the correct parameters for your template.
80+
parameters={
81+
"javascriptTextTransformFunctionName": "transformCSVtoJSON",
82+
"JSONPath": bucket_path + "/jsonSchema.json",
83+
"javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
84+
"inputFilePattern": bucket_path + "/inputFile.txt",
85+
"outputTable": project_id + ":average_weather.average_weather",
86+
"bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
87+
},
88+
)
89+
90+
# [END composer_dataflow_dag]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from airflow import models
16+
17+
from . import unit_testing
18+
19+
20+
def test_dag_import():
21+
"""Test that the DAG file can be successfully imported.
22+
23+
This tests that the DAG can be parsed, but does not run it in an Airflow
24+
environment. This is a recommended sanity check by the official Airflow
25+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
26+
"""
27+
models.Variable.set("bucket_path", "gs://example_bucket")
28+
models.Variable.set("project_id", "example-project")
29+
models.Variable.set("gce_zone", "us-central1-f")
30+
models.Variable.set("gce_region", "us-central1-f")
31+
from . import dataflowtemplateoperator_tutorial as module
32+
33+
unit_testing.assert_has_valid_dag(module)

0 commit comments

Comments
 (0)