Skip to content

Fix flaky DLP inspect tests #2359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

// [START dlp_inspect_bigquery]

import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.SettableFuture;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.BigQueryOptions;
import com.google.privacy.dlp.v2.BigQueryTable;
Expand Down Expand Up @@ -54,19 +54,18 @@ public static void inspectBigQueryTable()
String projectId = "your-project-id";
String bigQueryDatasetId = "your-bigquery-dataset-id";
String bigQueryTableId = "your-bigquery-table-id";
String pubSubTopicId = "your-pubsub-topic-id";
String pubSubSubscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTable(
projectId, bigQueryDatasetId, bigQueryTableId, pubSubTopicId, pubSubSubscriptionId);
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTable(projectId, bigQueryDatasetId, bigQueryTableId, topicId, subscriptionId);
}

// Inspects a BigQuery Table
public static void inspectBigQueryTable(
String projectId,
String bigQueryDatasetId,
String bigQueryTableId,
String pubSubTopicId,
String pubSubSubscriptionName)
String topicId,
String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
Expand Down Expand Up @@ -98,7 +97,7 @@ public static void inspectBigQueryTable(
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
Expand All @@ -119,36 +118,32 @@ public static void inspectBigQueryTable(
.build();

// Use the client to send the request.
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + job.getName());
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());

// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();

// Set up a Pub/Sub subscriber to listen for the job completion status
SettableFuture<Void> jobDone = SettableFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
MessageReceiver handleMessage =
ProjectSubscriptionName.of(projectId, subscriptionId);

MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
jobDone.set(null);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
subscriber.startAsync(); // Let the subscriber listen to messages
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();

// Wait for the original job to complete
try {
jobDone.get(10, TimeUnit.MINUTES);
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 10 minutes.");
System.out.println("Job was not completed after 15 minutes.");
return;
}

// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);

// Parse the response and process results.
Expand All @@ -161,5 +156,20 @@ public static void inspectBigQueryTable(
}
}
}

// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
// [END dlp_inspect_bigquery]
63 changes: 36 additions & 27 deletions dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

// [START dlp_inspect_datastore]

import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.SettableFuture;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DatastoreOptions;
Expand Down Expand Up @@ -55,19 +55,18 @@ public static void insepctDatastoreEntity()
String projectId = "your-project-id";
String datastoreNamespace = "your-datastore-namespace";
String datastoreKind = "your-datastore-kind";
String pubSubTopicId = "your-pubsub-topic-id";
String pubSubSubscriptionId = "your-pubsub-subscription-id";
insepctDatastoreEntity(
projectId, datastoreNamespace, datastoreKind, pubSubTopicId, pubSubSubscriptionId);
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
insepctDatastoreEntity(projectId, datastoreNamespace, datastoreKind, topicId, subscriptionId);
}

// Inspects a Datastore Entity.
public static void insepctDatastoreEntity(
String projectId,
String datastoreNamespce,
String datastoreKind,
String pubSubTopicId,
String pubSubSubscriptionName)
String topicId,
String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
Expand Down Expand Up @@ -99,7 +98,7 @@ public static void insepctDatastoreEntity(
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
Expand All @@ -120,37 +119,32 @@ public static void insepctDatastoreEntity(
.build();

// Use the client to send the request.
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + job.getName());
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());

// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();

// Set up a Pub/Sub subscriber to listen for the job completion status
SettableFuture<Void> jobDone = SettableFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
MessageReceiver handleMessage =
ProjectSubscriptionName.of(projectId, subscriptionId);

MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
jobDone.set(null);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
;
}
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
subscriber.startAsync(); // Let the subscriber listen to messages
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();

// Wait for the original job to complete
try {
jobDone.get(10, TimeUnit.MINUTES);
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 10 minutes.");
System.out.println("Job was not completed after 15 minutes.");
return;
}

// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);

// Parse the response and process results.
Expand All @@ -163,5 +157,20 @@ public static void insepctDatastoreEntity(
}
}
}

// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
// [END dlp_inspect_datastore]
59 changes: 35 additions & 24 deletions dlp/src/main/java/dlp/snippets/InspectGcsFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package dlp.snippets;

// [START dlp_inspect_gcs]

import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
Expand Down Expand Up @@ -52,14 +52,14 @@ public static void inspectGcsFile() throws InterruptedException, ExecutionExcept
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
String pubSubTopicId = "your-pubsub-topic-id";
String pubSubSubscriptionId = "your-pubsub-subscription-id";
inspectGcsFile(projectId, gcsUri, pubSubTopicId, pubSubSubscriptionId);
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectGcsFile(projectId, gcsUri, topicId, subscriptionId);
}

// Inspects a file in a Google Cloud Storage Bucket.
public static void inspectGcsFile(
String projectId, String gcsUri, String pubSubTopicId, String pubSubSubscriptionName)
String projectId, String gcsUri, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
Expand All @@ -84,7 +84,7 @@ public static void inspectGcsFile(
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
Expand All @@ -105,36 +105,32 @@ public static void inspectGcsFile(
.build();

// Use the client to send the request.
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + job.getName());
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());

// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();

// Set up a Pub/Sub subscriber to listen for the job completion status
SettableFuture<Void> jobDone = SettableFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
MessageReceiver handleMessage =
ProjectSubscriptionName.of(projectId, subscriptionId);

MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
jobDone.set(null);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
subscriber.startAsync(); // Let the subscriber listen to messages
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();

// Wait for the original job to complete
try {
jobDone.get(10, TimeUnit.MINUTES);
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 10 minutes.");
System.out.println("Job was not completed after 15 minutes.");
return;
}

// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);

// Parse the response and process results.
Expand All @@ -147,5 +143,20 @@ public static void inspectGcsFile(
}
}
}

// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
// [END dlp_inspect_gcs]