Skip to content

chore: update pubsublite/streaming-analytics #8006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 146 additions & 98 deletions pubsublite/streaming-analytics/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Pub/Sub Lite with Cloud Dataflow

[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/java-docs-samples&page=editor&open_in_editor=pubsublite/streaming-analytics/README.md)
[![Open in Cloud
Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/java-docs-samples&page=editor&open_in_editor=pubsublite/streaming-analytics/README.md)

Samples showing how to use [Pub/Sub Lite] with [Cloud Dataflow].

Expand All @@ -14,66 +15,91 @@ function, and writes them to [Cloud Storage].

Resources needed for this example:

1. A pair of Pub/Sub Lite topic and subscription.
2. A Cloud Storage bucket.
1. A pair of Pub/Sub Lite topic and subscription.
1. A Cloud Storage bucket.

### Setting up

1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_api,pubsublite.googleapis.com): Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.
> _When you enable Cloud Dataflow, which uses Compute Engine, a default Compute Engine service account with the Editor role (`roles/editor`) is created._
2. You can skip this step if you are trying this example in a Google Cloud environment like Cloud Shell.
1. [Enable the
APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_api,pubsublite.googleapis.com):
Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.

Otherwise, [create](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-gcloud) a user-managed service account and grant it the following roles on your project:
> _When you enable Cloud Dataflow, which uses Compute Engine, a default
> Compute Engine service account with the Editor role (`roles/editor`) is
> created._

1. You can skip this step if you are trying this example in a Google Cloud
environment like Cloud Shell.

Otherwise,
[create](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-gcloud)
a user-managed service account and grant it the following roles on your
project:
- `roles/dataflow.admin`
- `roles/pubsublite.viewer`
- `roles/pubsublite.subscriber`
- `roles/logging.viewer`

Then [create](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#iam-service-account-keys-create-gcloud) a service account key and point `GOOGLE_APPLICATION_CREDNETIALS` to your downloaded key file.
```sh
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file
```
3. Create a Cloud Storage bucket. Your bucket name needs to be globally unique.
```sh
export PROJECT_ID=$(gcloud config get-value project)
export BUCKET=your-gcs-bucket
Then
[create](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#iam-service-account-keys-create-gcloud)
a service account key and point `GOOGLE_APPLICATION_CREDNETIALS` to your
downloaded key file.

```sh
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file
```

1. Create a Cloud Storage bucket. Your bucket name needs to be globally unique.

```sh
export PROJECT_ID=$(gcloud config get-value project)
export BUCKET=your-gcs-bucket

gsutil mb gs://$BUCKET
```

1. Create a Pub/Sub Lite topic and subscription. Set `LITE_LOCATION` to a
[Pub/Sub Lite location].

```sh
export TOPIC=your-lite-topic
export SUBSCRIPTION=your-lite-subscription
export LITE_LOCATION=your-lite-location

gcloud pubsub lite-topics create $TOPIC \
--zone=$LITE_LOCATION \
--partitions=1 \
--per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
--zone=$LITE_LOCATION \
--topic=$TOPIC
```

1. Set `DATAFLOW_REGION` to a [Dataflow region] close to your Pub/Sub Lite
location.

```sh
export DATAFLOW_REGION=your-dateflow-region
```

gsutil mb gs://$BUCKET
```
4. Create a Pub/Sub Lite topic and subscription. Set `LITE_LOCATION` to a [Pub/Sub Lite location].
```sh
export TOPIC=your-lite-topic
export SUBSCRIPTION=your-lite-subscription
export LITE_LOCATION=your-lite-location

gcloud pubsub lite-topics create $TOPIC \
--zone=$LITE_LOCATION \
--partitions=1 \
--per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
--zone=$LITE_LOCATION \
--topic=$TOPIC
```
5. Set `DATAFLOW_REGION` to a [Dataflow region] close to your Pub/Sub Lite location.
```sh
export DATAFLOW_REGION=your-dateflow-region
```

### Running the example

[PubsubliteToGcs.java](src/main/java/examples/PubsubliteToGcs.java)

The following example runs a streaming pipeline. Choose `DirectRunner` to test it locally or `DataflowRunner` to run it on Dataflow.
The following example runs a streaming pipeline. Choose `DirectRunner` to test
it locally or `DataflowRunner` to run it on Dataflow.

+ `--subscription`: the Pub/Sub Lite subscription to read messages from
+ `--output`: the full filepath of the output files
+ `--windowSize [optional]`: the window size in minutes, defaults to 1
+ `--runner [optional]`: `DataflowRunner` or `DirectRunner`
+ `--project [optional]`: your project ID, optional if using `DirectRunner`
+ `--region [optional]`: the Dataflow region, optional if using `DirectRunner`
+ `--tempLocation`: a Cloud Storage location for temporary files, optional if using `DirectRunner`
- `--subscription`: the Pub/Sub Lite subscription to read messages from
- `--output`: the full filepath of the output files
- `--windowSize [optional]`: the window size in minutes, defaults to 1
- `--runner [optional]`: `DataflowRunner` or `DirectRunner`
- `--project [optional]`: your project ID, optional if using `DirectRunner`
- `--region [optional]`: the Dataflow region, optional if using `DirectRunner`
- `--tempLocation`: a Cloud Storage location for temporary files, optional if
using `DirectRunner`

Gradle:

```sh
gradle execute -Dexec.args="\
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
Expand All @@ -85,7 +111,8 @@ gradle execute -Dexec.args="\
--tempLocation=gs://$BUCKET/temp"
```

Maven:
Maven:

```sh
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
Expand All @@ -99,74 +126,95 @@ mvn compile exec:java \
--tempLocation=gs://$BUCKET/temp"
```

[Publish] some messages to your Lite topic. Then check for files in your Cloud Storage bucket.
[Publish] some messages to your Lite topic. Then check for files in your Cloud
Storage bucket.

```sh
gsutil ls "gs://$BUCKET/samples/output*"
```

## (Optional) Creating a custom Dataflow template
With a [`metadata.md`](metadata.md), you can create a [Dataflow Flex template]. Custom Dataflow Flex templates can be shared. You can run them with different input parameters.

1. Create a fat JAR. You should see `target/pubsublite-streaming-bundled-1.0.jar` as an output.
```sh
mvn clean package -DskipTests=true
ls -lh
```
With a [`metadata.md`](metadata.md), you can create a [Dataflow Flex template].
Custom Dataflow Flex templates can be shared. You can run them with different
input parameters.

1. Create a fat JAR. You should see
`target/pubsublite-streaming-bundled-1.0.jar` as an output.

```sh
mvn clean package -DskipTests=true
ls -lh
```

1. Provide names and locations for your template file and template container
image.

```sh
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"
```

1. Build a custom Flex template.

```sh
gcloud dataflow flex-template build $TEMPLATE_PATH \
--image-gcr-path "$TEMPLATE_IMAGE" \
--sdk-language "JAVA" \
--flex-template-base-image JAVA11 \
--metadata-file "metadata.json" \
--jar "target/pubsublite-streaming-bundled-1.0.jar" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
```

1. Run a job with the custom Flex template using `gcloud` or in Cloud Console.

> Note: Pub/Sub Lite allows only one subscriber to pull messages from one
> partition. If your Pub/Sub Lite topic has only one partition and you use a
> subscription attached to that topic in more than one Dataflow jobs, only one
> of them will get messages.

```sh
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region "$DATAFLOW_REGION"
```

2. Provide names and locations for your template file and template container image.
```sh
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"
```
## Cleaning up

3. Build a custom Flex template.
```sh
gcloud dataflow flex-template build $TEMPLATE_PATH \
--image-gcr-path "$TEMPLATE_IMAGE" \
--sdk-language "JAVA" \
--flex-template-base-image JAVA11 \
--metadata-file "metadata.json" \
--jar "target/pubsublite-streaming-bundled-1.0.jar" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
```
1. Stop the pipeline. If you use `DirectRunner`, `Ctrl+C` to cancel. If you use
`DataflowRunner`, [click](https://console.cloud.google.com/dataflow/jobs) on
the job you want to stop, then choose "Cancel".

4. Run a job with the custom Flex template using `gcloud` or in Cloud Console.
> Note: Pub/Sub Lite allows only one subscriber to pull messages from one partition. If your Pub/Sub Lite topic has only one partition and you use a subscription attached to that topic in more than one Dataflow jobs, only one of them will get messages.
```sh
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region "$DATAFLOW_REGION"
```
1. Delete the Lite topic and subscription.

## Cleaning up
```sh
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscription delete $SUBSCRIPTION
```

1. Stop the pipeline. If you use `DirectRunner`, `Ctrl+C` to cancel. If you use `DataflowRunner`, [click](https://console.cloud.google.com/dataflow/jobs) on the job you want to stop, then choose "Cancel".
1. Delete the Cloud Storage objects:

2. Delete the Lite topic and subscription.
```sh
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscription delete $SUBSCRIPTION
```

3. Delete the Cloud Storage objects:
```sh
gsutil -m rm -rf "gs://$BUCKET/samples/output*"
```
```sh
gsutil -m rm -rf "gs://$BUCKET/samples/output*"
```

4. Delete the template image in Cloud Registry and delete the Flex template if you have created them.
```sh
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH
```
1. Delete the template image in Cloud Registry and delete the Flex template if
you have created them.

5. Delete the Cloud Storage bucket:
```sh
gsutil rb "gs://$BUCKET"
```
```sh
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH
```

1. Delete the Cloud Storage bucket:

```sh
gsutil rb "gs://$BUCKET"
```

[Apache Beam]: https://beam.apache.org/
[Pub/Sub Lite]: https://cloud.google.com/pubsub/lite/docs/
Expand Down
22 changes: 9 additions & 13 deletions pubsublite/streaming-analytics/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,22 @@ repositories {
}
}

def beamVersion = '2.40.0'
def slf4jVersion = '1.7.36'
def beamVersion = '2.46.0'
def slf4jVersion = '2.0.7'
dependencies {
implementation 'com.github.spotbugs:spotbugs-annotations:4.7.3'
implementation enforcedPlatform("org.apache.beam:beam-sdks-java-io-google-cloud-platform:${beamVersion}")
implementation platform("com.google.cloud:libraries-bom:26.14.0")
implementation "com.github.spotbugs:spotbugs-annotations:4.7.3"
implementation "org.slf4j:slf4j-api:${slf4jVersion}"
implementation "org.slf4j:slf4j-jdk14:${slf4jVersion}"
implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
implementation "org.apache.beam:beam-sdks-java-io-google-cloud-platform:${beamVersion}"
implementation "org.apache.beam:beam-examples-java:${beamVersion}"
implementation "org.apache.beam:beam-runners-google-cloud-dataflow-java:${beamVersion}"
implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
runtimeOnly "org.apache.beam:beam-runners-direct-java:${beamVersion}"
runtimeOnly "org.apache.beam:beam-runners-google-cloud-dataflow-java:${beamVersion}"
testImplementation 'com.google.api-client:google-api-client:2.2.0'
testImplementation 'com.google.apis:google-api-services-dataflow:v1b3-rev20210825-1.32.1'
testImplementation 'com.google.cloud:google-cloud-core:2.12.0'
testImplementation 'com.google.cloud:google-cloud-storage:2.22.0'
testImplementation 'com.google.truth:truth:1.1.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'com.google.cloud:google-cloud-storage'
}

group = 'com.example'
group = 'com.example.pubsublite'
version = '1.0.0-SNAPSHOT'
description = 'pubsublite-streaming'

Expand Down
2 changes: 1 addition & 1 deletion pubsublite/streaming-analytics/metadata.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "Pub/Sub Lite to Clous Storage",
"name": "Pub/Sub Lite to Cloud Storage",
"description": "An Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, applies fixed windowing on the messages, and writes the results to files on Cloud Storage.",
"parameters": [
{
Expand Down
Loading