Skip to content

Commit ab22243

Browse files
authored
chore: update pubsublite/streaming-analytics (#8006)
* chore: use truth for debugging pubsublite * cleanup * chore: update pubsublite/streaming-analytics * linting * linting
1 parent cd4c3e2 commit ab22243

File tree

6 files changed

+190
-173
lines changed

6 files changed

+190
-173
lines changed

pubsublite/streaming-analytics/README.md

Lines changed: 146 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Pub/Sub Lite with Cloud Dataflow
22

3-
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/java-docs-samples&page=editor&open_in_editor=pubsublite/streaming-analytics/README.md)
3+
[![Open in Cloud
4+
Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/java-docs-samples&page=editor&open_in_editor=pubsublite/streaming-analytics/README.md)
45

56
Samples showing how to use [Pub/Sub Lite] with [Cloud Dataflow].
67

@@ -14,66 +15,91 @@ function, and writes them to [Cloud Storage].
1415

1516
Resources needed for this example:
1617

17-
1. A pair of Pub/Sub Lite topic and subscription.
18-
2. A Cloud Storage bucket.
18+
1. A pair of Pub/Sub Lite topic and subscription.
19+
1. A Cloud Storage bucket.
1920

2021
### Setting up
2122

22-
1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_api,pubsublite.googleapis.com): Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.
23-
> _When you enable Cloud Dataflow, which uses Compute Engine, a default Compute Engine service account with the Editor role (`roles/editor`) is created._
24-
2. You can skip this step if you are trying this example in a Google Cloud environment like Cloud Shell.
23+
1. [Enable the
24+
APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_api,pubsublite.googleapis.com):
25+
Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.
2526

26-
Otherwise, [create](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-gcloud) a user-managed service account and grant it the following roles on your project:
27+
> _When you enable Cloud Dataflow, which uses Compute Engine, a default
28+
> Compute Engine service account with the Editor role (`roles/editor`) is
29+
> created._
30+
31+
1. You can skip this step if you are trying this example in a Google Cloud
32+
environment like Cloud Shell.
33+
34+
Otherwise,
35+
[create](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-gcloud)
36+
a user-managed service account and grant it the following roles on your
37+
project:
2738
- `roles/dataflow.admin`
2839
- `roles/pubsublite.viewer`
2940
- `roles/pubsublite.subscriber`
3041
- `roles/logging.viewer`
3142

32-
Then [create](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#iam-service-account-keys-create-gcloud) a service account key and point `GOOGLE_APPLICATION_CREDNETIALS` to your downloaded key file.
33-
```sh
34-
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file
35-
```
36-
3. Create a Cloud Storage bucket. Your bucket name needs to be globally unique.
37-
```sh
38-
export PROJECT_ID=$(gcloud config get-value project)
39-
export BUCKET=your-gcs-bucket
43+
Then
44+
[create](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#iam-service-account-keys-create-gcloud)
45+
a service account key and point `GOOGLE_APPLICATION_CREDNETIALS` to your
46+
downloaded key file.
47+
48+
```sh
49+
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file
50+
```
51+
52+
1. Create a Cloud Storage bucket. Your bucket name needs to be globally unique.
53+
54+
```sh
55+
export PROJECT_ID=$(gcloud config get-value project)
56+
export BUCKET=your-gcs-bucket
57+
58+
gsutil mb gs://$BUCKET
59+
```
60+
61+
1. Create a Pub/Sub Lite topic and subscription. Set `LITE_LOCATION` to a
62+
[Pub/Sub Lite location].
63+
64+
```sh
65+
export TOPIC=your-lite-topic
66+
export SUBSCRIPTION=your-lite-subscription
67+
export LITE_LOCATION=your-lite-location
68+
69+
gcloud pubsub lite-topics create $TOPIC \
70+
--zone=$LITE_LOCATION \
71+
--partitions=1 \
72+
--per-partition-bytes=30GiB
73+
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
74+
--zone=$LITE_LOCATION \
75+
--topic=$TOPIC
76+
```
77+
78+
1. Set `DATAFLOW_REGION` to a [Dataflow region] close to your Pub/Sub Lite
79+
location.
80+
81+
```sh
82+
export DATAFLOW_REGION=your-dateflow-region
83+
```
4084

41-
gsutil mb gs://$BUCKET
42-
```
43-
4. Create a Pub/Sub Lite topic and subscription. Set `LITE_LOCATION` to a [Pub/Sub Lite location].
44-
```sh
45-
export TOPIC=your-lite-topic
46-
export SUBSCRIPTION=your-lite-subscription
47-
export LITE_LOCATION=your-lite-location
48-
49-
gcloud pubsub lite-topics create $TOPIC \
50-
--zone=$LITE_LOCATION \
51-
--partitions=1 \
52-
--per-partition-bytes=30GiB
53-
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
54-
--zone=$LITE_LOCATION \
55-
--topic=$TOPIC
56-
```
57-
5. Set `DATAFLOW_REGION` to a [Dataflow region] close to your Pub/Sub Lite location.
58-
```sh
59-
export DATAFLOW_REGION=your-dateflow-region
60-
```
61-
6285
### Running the example
6386

6487
[PubsubliteToGcs.java](src/main/java/examples/PubsubliteToGcs.java)
6588

66-
The following example runs a streaming pipeline. Choose `DirectRunner` to test it locally or `DataflowRunner` to run it on Dataflow.
89+
The following example runs a streaming pipeline. Choose `DirectRunner` to test
90+
it locally or `DataflowRunner` to run it on Dataflow.
6791

68-
+ `--subscription`: the Pub/Sub Lite subscription to read messages from
69-
+ `--output`: the full filepath of the output files
70-
+ `--windowSize [optional]`: the window size in minutes, defaults to 1
71-
+ `--runner [optional]`: `DataflowRunner` or `DirectRunner`
72-
+ `--project [optional]`: your project ID, optional if using `DirectRunner`
73-
+ `--region [optional]`: the Dataflow region, optional if using `DirectRunner`
74-
+ `--tempLocation`: a Cloud Storage location for temporary files, optional if using `DirectRunner`
92+
- `--subscription`: the Pub/Sub Lite subscription to read messages from
93+
- `--output`: the full filepath of the output files
94+
- `--windowSize [optional]`: the window size in minutes, defaults to 1
95+
- `--runner [optional]`: `DataflowRunner` or `DirectRunner`
96+
- `--project [optional]`: your project ID, optional if using `DirectRunner`
97+
- `--region [optional]`: the Dataflow region, optional if using `DirectRunner`
98+
- `--tempLocation`: a Cloud Storage location for temporary files, optional if
99+
using `DirectRunner`
75100

76101
Gradle:
102+
77103
```sh
78104
gradle execute -Dexec.args="\
79105
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
@@ -85,7 +111,8 @@ gradle execute -Dexec.args="\
85111
--tempLocation=gs://$BUCKET/temp"
86112
```
87113

88-
Maven:
114+
Maven:
115+
89116
```sh
90117
mvn compile exec:java \
91118
-Dexec.mainClass=examples.PubsubliteToGcs \
@@ -99,74 +126,95 @@ mvn compile exec:java \
99126
--tempLocation=gs://$BUCKET/temp"
100127
```
101128

102-
[Publish] some messages to your Lite topic. Then check for files in your Cloud Storage bucket.
129+
[Publish] some messages to your Lite topic. Then check for files in your Cloud
130+
Storage bucket.
103131

104132
```sh
105133
gsutil ls "gs://$BUCKET/samples/output*"
106134
```
107135

108136
## (Optional) Creating a custom Dataflow template
109-
With a [`metadata.md`](metadata.md), you can create a [Dataflow Flex template]. Custom Dataflow Flex templates can be shared. You can run them with different input parameters.
110137

111-
1. Create a fat JAR. You should see `target/pubsublite-streaming-bundled-1.0.jar` as an output.
112-
```sh
113-
mvn clean package -DskipTests=true
114-
ls -lh
115-
```
138+
With a [`metadata.md`](metadata.md), you can create a [Dataflow Flex template].
139+
Custom Dataflow Flex templates can be shared. You can run them with different
140+
input parameters.
141+
142+
1. Create a fat JAR. You should see
143+
`target/pubsublite-streaming-bundled-1.0.jar` as an output.
144+
145+
```sh
146+
mvn clean package -DskipTests=true
147+
ls -lh
148+
```
149+
150+
1. Provide names and locations for your template file and template container
151+
image.
152+
153+
```sh
154+
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
155+
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"
156+
```
157+
158+
1. Build a custom Flex template.
159+
160+
```sh
161+
gcloud dataflow flex-template build $TEMPLATE_PATH \
162+
--image-gcr-path "$TEMPLATE_IMAGE" \
163+
--sdk-language "JAVA" \
164+
--flex-template-base-image JAVA11 \
165+
--metadata-file "metadata.json" \
166+
--jar "target/pubsublite-streaming-bundled-1.0.jar" \
167+
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
168+
```
169+
170+
1. Run a job with the custom Flex template using `gcloud` or in Cloud Console.
171+
172+
> Note: Pub/Sub Lite allows only one subscriber to pull messages from one
173+
> partition. If your Pub/Sub Lite topic has only one partition and you use a
174+
> subscription attached to that topic in more than one Dataflow jobs, only one
175+
> of them will get messages.
176+
177+
```sh
178+
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
179+
--template-file-gcs-location "$TEMPLATE_PATH" \
180+
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
181+
--parameters output="gs://$BUCKET/samples/template-output" \
182+
--parameters windowSize=1 \
183+
--region "$DATAFLOW_REGION"
184+
```
116185

117-
2. Provide names and locations for your template file and template container image.
118-
```sh
119-
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
120-
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"
121-
```
186+
## Cleaning up
122187

123-
3. Build a custom Flex template.
124-
```sh
125-
gcloud dataflow flex-template build $TEMPLATE_PATH \
126-
--image-gcr-path "$TEMPLATE_IMAGE" \
127-
--sdk-language "JAVA" \
128-
--flex-template-base-image JAVA11 \
129-
--metadata-file "metadata.json" \
130-
--jar "target/pubsublite-streaming-bundled-1.0.jar" \
131-
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
132-
```
188+
1. Stop the pipeline. If you use `DirectRunner`, `Ctrl+C` to cancel. If you use
189+
`DataflowRunner`, [click](https://console.cloud.google.com/dataflow/jobs) on
190+
the job you want to stop, then choose "Cancel".
133191

134-
4. Run a job with the custom Flex template using `gcloud` or in Cloud Console.
135-
> Note: Pub/Sub Lite allows only one subscriber to pull messages from one partition. If your Pub/Sub Lite topic has only one partition and you use a subscription attached to that topic in more than one Dataflow jobs, only one of them will get messages.
136-
```sh
137-
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
138-
--template-file-gcs-location "$TEMPLATE_PATH" \
139-
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
140-
--parameters output="gs://$BUCKET/samples/template-output" \
141-
--parameters windowSize=1 \
142-
--region "$DATAFLOW_REGION"
143-
```
192+
1. Delete the Lite topic and subscription.
144193

145-
## Cleaning up
194+
```sh
195+
gcloud pubsub lite-topics delete $TOPIC
196+
gcloud pubsub lite-subscription delete $SUBSCRIPTION
197+
```
146198

147-
1. Stop the pipeline. If you use `DirectRunner`, `Ctrl+C` to cancel. If you use `DataflowRunner`, [click](https://console.cloud.google.com/dataflow/jobs) on the job you want to stop, then choose "Cancel".
199+
1. Delete the Cloud Storage objects:
148200

149-
2. Delete the Lite topic and subscription.
150-
```sh
151-
gcloud pubsub lite-topics delete $TOPIC
152-
gcloud pubsub lite-subscription delete $SUBSCRIPTION
153-
```
154-
155-
3. Delete the Cloud Storage objects:
156-
```sh
157-
gsutil -m rm -rf "gs://$BUCKET/samples/output*"
158-
```
201+
```sh
202+
gsutil -m rm -rf "gs://$BUCKET/samples/output*"
203+
```
159204

160-
4. Delete the template image in Cloud Registry and delete the Flex template if you have created them.
161-
```sh
162-
gcloud container images delete $TEMPLATE_IMAGE
163-
gsutil rm $TEMPLATE_PATH
164-
```
205+
1. Delete the template image in Cloud Registry and delete the Flex template if
206+
you have created them.
165207

166-
5. Delete the Cloud Storage bucket:
167-
```sh
168-
gsutil rb "gs://$BUCKET"
169-
```
208+
```sh
209+
gcloud container images delete $TEMPLATE_IMAGE
210+
gsutil rm $TEMPLATE_PATH
211+
```
212+
213+
1. Delete the Cloud Storage bucket:
214+
215+
```sh
216+
gsutil rb "gs://$BUCKET"
217+
```
170218

171219
[Apache Beam]: https://beam.apache.org/
172220
[Pub/Sub Lite]: https://cloud.google.com/pubsub/lite/docs/

pubsublite/streaming-analytics/build.gradle

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,22 @@ repositories {
3333
}
3434
}
3535

36-
def beamVersion = '2.40.0'
37-
def slf4jVersion = '1.7.36'
36+
def beamVersion = '2.46.0'
37+
def slf4jVersion = '2.0.7'
3838
dependencies {
39-
implementation 'com.github.spotbugs:spotbugs-annotations:4.7.3'
39+
implementation enforcedPlatform("org.apache.beam:beam-sdks-java-io-google-cloud-platform:${beamVersion}")
40+
implementation platform("com.google.cloud:libraries-bom:26.14.0")
41+
implementation "com.github.spotbugs:spotbugs-annotations:4.7.3"
4042
implementation "org.slf4j:slf4j-api:${slf4jVersion}"
4143
implementation "org.slf4j:slf4j-jdk14:${slf4jVersion}"
42-
implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
43-
implementation "org.apache.beam:beam-sdks-java-io-google-cloud-platform:${beamVersion}"
4444
implementation "org.apache.beam:beam-examples-java:${beamVersion}"
45+
implementation "org.apache.beam:beam-runners-google-cloud-dataflow-java:${beamVersion}"
46+
implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
4547
runtimeOnly "org.apache.beam:beam-runners-direct-java:${beamVersion}"
46-
runtimeOnly "org.apache.beam:beam-runners-google-cloud-dataflow-java:${beamVersion}"
47-
testImplementation 'com.google.api-client:google-api-client:2.2.0'
48-
testImplementation 'com.google.apis:google-api-services-dataflow:v1b3-rev20210825-1.32.1'
49-
testImplementation 'com.google.cloud:google-cloud-core:2.12.0'
50-
testImplementation 'com.google.cloud:google-cloud-storage:2.22.0'
51-
testImplementation 'com.google.truth:truth:1.1.3'
52-
testImplementation 'org.hamcrest:hamcrest-all:1.3'
48+
testImplementation 'com.google.cloud:google-cloud-storage'
5349
}
5450

55-
group = 'com.example'
51+
group = 'com.example.pubsublite'
5652
version = '1.0.0-SNAPSHOT'
5753
description = 'pubsublite-streaming'
5854

pubsublite/streaming-analytics/metadata.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "Pub/Sub Lite to Clous Storage",
2+
"name": "Pub/Sub Lite to Cloud Storage",
33
"description": "An Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, applies fixed windowing on the messages, and writes the results to files on Cloud Storage.",
44
"parameters": [
55
{

0 commit comments

Comments
 (0)