Skip to content

Commit 8a10d65

Browse files
shubha-rajanchingor13
authored andcommitted
samples: Fix flaky DLP inspect tests (#2359)
* refactored and increased time limit for inspect samples
1 parent c871b63 commit 8a10d65

File tree

3 files changed

+107
-77
lines changed

3 files changed

+107
-77
lines changed

dlp/snippets/snippets/src/main/java/dlp/snippets/InspectBigQueryTable.java

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
// [START dlp_inspect_bigquery]
2020

21+
import com.google.api.core.SettableApiFuture;
2122
import com.google.cloud.dlp.v2.DlpServiceClient;
2223
import com.google.cloud.pubsub.v1.AckReplyConsumer;
2324
import com.google.cloud.pubsub.v1.MessageReceiver;
2425
import com.google.cloud.pubsub.v1.Subscriber;
25-
import com.google.common.util.concurrent.SettableFuture;
2626
import com.google.privacy.dlp.v2.Action;
2727
import com.google.privacy.dlp.v2.BigQueryOptions;
2828
import com.google.privacy.dlp.v2.BigQueryTable;
@@ -54,19 +54,18 @@ public static void inspectBigQueryTable()
5454
String projectId = "your-project-id";
5555
String bigQueryDatasetId = "your-bigquery-dataset-id";
5656
String bigQueryTableId = "your-bigquery-table-id";
57-
String pubSubTopicId = "your-pubsub-topic-id";
58-
String pubSubSubscriptionId = "your-pubsub-subscription-id";
59-
inspectBigQueryTable(
60-
projectId, bigQueryDatasetId, bigQueryTableId, pubSubTopicId, pubSubSubscriptionId);
57+
String topicId = "your-pubsub-topic-id";
58+
String subscriptionId = "your-pubsub-subscription-id";
59+
inspectBigQueryTable(projectId, bigQueryDatasetId, bigQueryTableId, topicId, subscriptionId);
6160
}
6261

6362
// Inspects a BigQuery Table
6463
public static void inspectBigQueryTable(
6564
String projectId,
6665
String bigQueryDatasetId,
6766
String bigQueryTableId,
68-
String pubSubTopicId,
69-
String pubSubSubscriptionName)
67+
String topicId,
68+
String subscriptionId)
7069
throws ExecutionException, InterruptedException, IOException {
7170
// Initialize client that will be used to send requests. This client only needs to be created
7271
// once, and can be reused for multiple requests. After completing all of your requests, call
@@ -98,7 +97,7 @@ public static void inspectBigQueryTable(
9897
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
9998

10099
// Specify the action that is triggered when the job completes.
101-
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
100+
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
102101
Action.PublishToPubSub publishToPubSub =
103102
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
104103
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
@@ -119,36 +118,32 @@ public static void inspectBigQueryTable(
119118
.build();
120119

121120
// Use the client to send the request.
122-
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
123-
System.out.println("Job created: " + job.getName());
121+
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
122+
System.out.println("Job created: " + dlpJob.getName());
123+
124+
// Set up a Pub/Sub subscriber to listen on the job completion status
125+
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
124126

125-
// Set up a Pub/Sub subscriber to listen for the job completion status
126-
SettableFuture<Void> jobDone = SettableFuture.create();
127127
ProjectSubscriptionName subscriptionName =
128-
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
129-
MessageReceiver handleMessage =
128+
ProjectSubscriptionName.of(projectId, subscriptionId);
129+
130+
MessageReceiver messageHandler =
130131
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
131-
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
132-
if (job.getName().equals(messageAttribute)) {
133-
jobDone.set(null);
134-
ackReplyConsumer.ack();
135-
} else {
136-
ackReplyConsumer.nack();
137-
}
132+
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
138133
};
139-
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
140-
subscriber.startAsync(); // Let the subscriber listen to messages
134+
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
135+
subscriber.startAsync();
141136

142137
// Wait for the original job to complete
143138
try {
144-
jobDone.get(10, TimeUnit.MINUTES);
139+
done.get(15, TimeUnit.MINUTES);
145140
} catch (TimeoutException e) {
146-
System.out.println("Job was not completed after 10 minutes.");
141+
System.out.println("Job was not completed after 15 minutes.");
147142
return;
148143
}
149144

150145
// Get the latest state of the job from the service
151-
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
146+
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
152147
DlpJob completedJob = dlp.getDlpJob(request);
153148

154149
// Parse the response and process results.
@@ -161,5 +156,20 @@ public static void inspectBigQueryTable(
161156
}
162157
}
163158
}
159+
160+
// handleMessage injects the job and settableFuture into the message reciever interface
161+
private static void handleMessage(
162+
DlpJob job,
163+
SettableApiFuture<Boolean> done,
164+
PubsubMessage pubsubMessage,
165+
AckReplyConsumer ackReplyConsumer) {
166+
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
167+
if (job.getName().equals(messageAttribute)) {
168+
done.set(true);
169+
ackReplyConsumer.ack();
170+
} else {
171+
ackReplyConsumer.nack();
172+
}
173+
}
164174
}
165175
// [END dlp_inspect_bigquery]

dlp/snippets/snippets/src/main/java/dlp/snippets/InspectDatastoreEntity.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
// [START dlp_inspect_datastore]
2020

21+
import com.google.api.core.SettableApiFuture;
2122
import com.google.cloud.dlp.v2.DlpServiceClient;
2223
import com.google.cloud.pubsub.v1.AckReplyConsumer;
2324
import com.google.cloud.pubsub.v1.MessageReceiver;
2425
import com.google.cloud.pubsub.v1.Subscriber;
25-
import com.google.common.util.concurrent.SettableFuture;
2626
import com.google.privacy.dlp.v2.Action;
2727
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
2828
import com.google.privacy.dlp.v2.DatastoreOptions;
@@ -55,19 +55,18 @@ public static void insepctDatastoreEntity()
5555
String projectId = "your-project-id";
5656
String datastoreNamespace = "your-datastore-namespace";
5757
String datastoreKind = "your-datastore-kind";
58-
String pubSubTopicId = "your-pubsub-topic-id";
59-
String pubSubSubscriptionId = "your-pubsub-subscription-id";
60-
insepctDatastoreEntity(
61-
projectId, datastoreNamespace, datastoreKind, pubSubTopicId, pubSubSubscriptionId);
58+
String topicId = "your-pubsub-topic-id";
59+
String subscriptionId = "your-pubsub-subscription-id";
60+
insepctDatastoreEntity(projectId, datastoreNamespace, datastoreKind, topicId, subscriptionId);
6261
}
6362

6463
// Inspects a Datastore Entity.
6564
public static void insepctDatastoreEntity(
6665
String projectId,
6766
String datastoreNamespce,
6867
String datastoreKind,
69-
String pubSubTopicId,
70-
String pubSubSubscriptionName)
68+
String topicId,
69+
String subscriptionId)
7170
throws ExecutionException, InterruptedException, IOException {
7271
// Initialize client that will be used to send requests. This client only needs to be created
7372
// once, and can be reused for multiple requests. After completing all of your requests, call
@@ -99,7 +98,7 @@ public static void insepctDatastoreEntity(
9998
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
10099

101100
// Specify the action that is triggered when the job completes.
102-
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
101+
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
103102
Action.PublishToPubSub publishToPubSub =
104103
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
105104
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
@@ -120,37 +119,32 @@ public static void insepctDatastoreEntity(
120119
.build();
121120

122121
// Use the client to send the request.
123-
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
124-
System.out.println("Job created: " + job.getName());
122+
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
123+
System.out.println("Job created: " + dlpJob.getName());
124+
125+
// Set up a Pub/Sub subscriber to listen on the job completion status
126+
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
125127

126-
// Set up a Pub/Sub subscriber to listen for the job completion status
127-
SettableFuture<Void> jobDone = SettableFuture.create();
128128
ProjectSubscriptionName subscriptionName =
129-
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
130-
MessageReceiver handleMessage =
129+
ProjectSubscriptionName.of(projectId, subscriptionId);
130+
131+
MessageReceiver messageHandler =
131132
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
132-
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
133-
if (job.getName().equals(messageAttribute)) {
134-
jobDone.set(null);
135-
ackReplyConsumer.ack();
136-
} else {
137-
ackReplyConsumer.nack();
138-
;
139-
}
133+
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
140134
};
141-
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
142-
subscriber.startAsync(); // Let the subscriber listen to messages
135+
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
136+
subscriber.startAsync();
143137

144138
// Wait for the original job to complete
145139
try {
146-
jobDone.get(10, TimeUnit.MINUTES);
140+
done.get(15, TimeUnit.MINUTES);
147141
} catch (TimeoutException e) {
148-
System.out.println("Job was not completed after 10 minutes.");
142+
System.out.println("Job was not completed after 15 minutes.");
149143
return;
150144
}
151145

152146
// Get the latest state of the job from the service
153-
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
147+
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
154148
DlpJob completedJob = dlp.getDlpJob(request);
155149

156150
// Parse the response and process results.
@@ -163,5 +157,20 @@ public static void insepctDatastoreEntity(
163157
}
164158
}
165159
}
160+
161+
// handleMessage injects the job and settableFuture into the message reciever interface
162+
private static void handleMessage(
163+
DlpJob job,
164+
SettableApiFuture<Boolean> done,
165+
PubsubMessage pubsubMessage,
166+
AckReplyConsumer ackReplyConsumer) {
167+
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
168+
if (job.getName().equals(messageAttribute)) {
169+
done.set(true);
170+
ackReplyConsumer.ack();
171+
} else {
172+
ackReplyConsumer.nack();
173+
}
174+
}
166175
}
167176
// [END dlp_inspect_datastore]

dlp/snippets/snippets/src/main/java/dlp/snippets/InspectGcsFile.java

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package dlp.snippets;
1818

1919
// [START dlp_inspect_gcs]
20-
20+
import com.google.api.core.SettableApiFuture;
2121
import com.google.cloud.dlp.v2.DlpServiceClient;
2222
import com.google.cloud.pubsub.v1.AckReplyConsumer;
2323
import com.google.cloud.pubsub.v1.MessageReceiver;
@@ -52,14 +52,14 @@ public static void inspectGcsFile() throws InterruptedException, ExecutionExcept
5252
// TODO(developer): Replace these variables before running the sample.
5353
String projectId = "your-project-id";
5454
String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
55-
String pubSubTopicId = "your-pubsub-topic-id";
56-
String pubSubSubscriptionId = "your-pubsub-subscription-id";
57-
inspectGcsFile(projectId, gcsUri, pubSubTopicId, pubSubSubscriptionId);
55+
String topicId = "your-pubsub-topic-id";
56+
String subscriptionId = "your-pubsub-subscription-id";
57+
inspectGcsFile(projectId, gcsUri, topicId, subscriptionId);
5858
}
5959

6060
// Inspects a file in a Google Cloud Storage Bucket.
6161
public static void inspectGcsFile(
62-
String projectId, String gcsUri, String pubSubTopicId, String pubSubSubscriptionName)
62+
String projectId, String gcsUri, String topicId, String subscriptionId)
6363
throws ExecutionException, InterruptedException, IOException {
6464
// Initialize client that will be used to send requests. This client only needs to be created
6565
// once, and can be reused for multiple requests. After completing all of your requests, call
@@ -84,7 +84,7 @@ public static void inspectGcsFile(
8484
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();
8585

8686
// Specify the action that is triggered when the job completes.
87-
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
87+
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
8888
Action.PublishToPubSub publishToPubSub =
8989
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
9090
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
@@ -105,36 +105,32 @@ public static void inspectGcsFile(
105105
.build();
106106

107107
// Use the client to send the request.
108-
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
109-
System.out.println("Job created: " + job.getName());
108+
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
109+
System.out.println("Job created: " + dlpJob.getName());
110+
111+
// Set up a Pub/Sub subscriber to listen on the job completion status
112+
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
110113

111-
// Set up a Pub/Sub subscriber to listen for the job completion status
112-
SettableFuture<Void> jobDone = SettableFuture.create();
113114
ProjectSubscriptionName subscriptionName =
114-
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
115-
MessageReceiver handleMessage =
115+
ProjectSubscriptionName.of(projectId, subscriptionId);
116+
117+
MessageReceiver messageHandler =
116118
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
117-
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
118-
if (job.getName().equals(messageAttribute)) {
119-
jobDone.set(null);
120-
ackReplyConsumer.ack();
121-
} else {
122-
ackReplyConsumer.nack();
123-
}
119+
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
124120
};
125-
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
126-
subscriber.startAsync(); // Let the subscriber listen to messages
121+
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
122+
subscriber.startAsync();
127123

128124
// Wait for the original job to complete
129125
try {
130-
jobDone.get(10, TimeUnit.MINUTES);
126+
done.get(15, TimeUnit.MINUTES);
131127
} catch (TimeoutException e) {
132-
System.out.println("Job was not completed after 10 minutes.");
128+
System.out.println("Job was not completed after 15 minutes.");
133129
return;
134130
}
135131

136132
// Get the latest state of the job from the service
137-
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
133+
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
138134
DlpJob completedJob = dlp.getDlpJob(request);
139135

140136
// Parse the response and process results.
@@ -147,5 +143,20 @@ public static void inspectGcsFile(
147143
}
148144
}
149145
}
146+
147+
// handleMessage injects the job and settableFuture into the message reciever interface
148+
private static void handleMessage(
149+
DlpJob job,
150+
SettableApiFuture<Boolean> done,
151+
PubsubMessage pubsubMessage,
152+
AckReplyConsumer ackReplyConsumer) {
153+
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
154+
if (job.getName().equals(messageAttribute)) {
155+
done.set(true);
156+
ackReplyConsumer.ack();
157+
} else {
158+
ackReplyConsumer.nack();
159+
}
160+
}
150161
}
151162
// [END dlp_inspect_gcs]

0 commit comments

Comments
 (0)