18
18
19
19
// [START dlp_inspect_datastore]
20
20
21
+ import com .google .api .core .SettableApiFuture ;
21
22
import com .google .cloud .dlp .v2 .DlpServiceClient ;
22
23
import com .google .cloud .pubsub .v1 .AckReplyConsumer ;
23
24
import com .google .cloud .pubsub .v1 .MessageReceiver ;
24
25
import com .google .cloud .pubsub .v1 .Subscriber ;
25
- import com .google .common .util .concurrent .SettableFuture ;
26
26
import com .google .privacy .dlp .v2 .Action ;
27
27
import com .google .privacy .dlp .v2 .CreateDlpJobRequest ;
28
28
import com .google .privacy .dlp .v2 .DatastoreOptions ;
@@ -55,19 +55,18 @@ public static void insepctDatastoreEntity()
55
55
String projectId = "your-project-id" ;
56
56
String datastoreNamespace = "your-datastore-namespace" ;
57
57
String datastoreKind = "your-datastore-kind" ;
58
- String pubSubTopicId = "your-pubsub-topic-id" ;
59
- String pubSubSubscriptionId = "your-pubsub-subscription-id" ;
60
- insepctDatastoreEntity (
61
- projectId , datastoreNamespace , datastoreKind , pubSubTopicId , pubSubSubscriptionId );
58
+ String topicId = "your-pubsub-topic-id" ;
59
+ String subscriptionId = "your-pubsub-subscription-id" ;
60
+ insepctDatastoreEntity (projectId , datastoreNamespace , datastoreKind , topicId , subscriptionId );
62
61
}
63
62
64
63
// Inspects a Datastore Entity.
65
64
public static void insepctDatastoreEntity (
66
65
String projectId ,
67
66
String datastoreNamespce ,
68
67
String datastoreKind ,
69
- String pubSubTopicId ,
70
- String pubSubSubscriptionName )
68
+ String topicId ,
69
+ String subscriptionId )
71
70
throws ExecutionException , InterruptedException , IOException {
72
71
// Initialize client that will be used to send requests. This client only needs to be created
73
72
// once, and can be reused for multiple requests. After completing all of your requests, call
@@ -99,7 +98,7 @@ public static void insepctDatastoreEntity(
99
98
InspectConfig .newBuilder ().addAllInfoTypes (infoTypes ).setIncludeQuote (true ).build ();
100
99
101
100
// Specify the action that is triggered when the job completes.
102
- String pubSubTopic = String .format ("projects/%s/topics/%s" , projectId , pubSubTopicId );
101
+ String pubSubTopic = String .format ("projects/%s/topics/%s" , projectId , topicId );
103
102
Action .PublishToPubSub publishToPubSub =
104
103
Action .PublishToPubSub .newBuilder ().setTopic (pubSubTopic ).build ();
105
104
Action action = Action .newBuilder ().setPubSub (publishToPubSub ).build ();
@@ -120,37 +119,32 @@ public static void insepctDatastoreEntity(
120
119
.build ();
121
120
122
121
// Use the client to send the request.
123
- final DlpJob job = dlp .createDlpJob (createDlpJobRequest );
124
- System .out .println ("Job created: " + job .getName ());
122
+ final DlpJob dlpJob = dlp .createDlpJob (createDlpJobRequest );
123
+ System .out .println ("Job created: " + dlpJob .getName ());
124
+
125
+ // Set up a Pub/Sub subscriber to listen on the job completion status
126
+ final SettableApiFuture <Boolean > done = SettableApiFuture .create ();
125
127
126
- // Set up a Pub/Sub subscriber to listen for the job completion status
127
- SettableFuture <Void > jobDone = SettableFuture .create ();
128
128
ProjectSubscriptionName subscriptionName =
129
- ProjectSubscriptionName .of (projectId , pubSubSubscriptionName );
130
- MessageReceiver handleMessage =
129
+ ProjectSubscriptionName .of (projectId , subscriptionId );
130
+
131
+ MessageReceiver messageHandler =
131
132
(PubsubMessage pubsubMessage , AckReplyConsumer ackReplyConsumer ) -> {
132
- String messageAttribute = pubsubMessage .getAttributesMap ().get ("DlpJobName" );
133
- if (job .getName ().equals (messageAttribute )) {
134
- jobDone .set (null );
135
- ackReplyConsumer .ack ();
136
- } else {
137
- ackReplyConsumer .nack ();
138
- ;
139
- }
133
+ handleMessage (dlpJob , done , pubsubMessage , ackReplyConsumer );
140
134
};
141
- Subscriber subscriber = Subscriber .newBuilder (subscriptionName , handleMessage ).build ();
142
- subscriber .startAsync (); // Let the subscriber listen to messages
135
+ Subscriber subscriber = Subscriber .newBuilder (subscriptionName , messageHandler ).build ();
136
+ subscriber .startAsync ();
143
137
144
138
// Wait for the original job to complete
145
139
try {
146
- jobDone .get (10 , TimeUnit .MINUTES );
140
+ done .get (15 , TimeUnit .MINUTES );
147
141
} catch (TimeoutException e ) {
148
- System .out .println ("Job was not completed after 10 minutes." );
142
+ System .out .println ("Job was not completed after 15 minutes." );
149
143
return ;
150
144
}
151
145
152
146
// Get the latest state of the job from the service
153
- GetDlpJobRequest request = GetDlpJobRequest .newBuilder ().setName (job .getName ()).build ();
147
+ GetDlpJobRequest request = GetDlpJobRequest .newBuilder ().setName (dlpJob .getName ()).build ();
154
148
DlpJob completedJob = dlp .getDlpJob (request );
155
149
156
150
// Parse the response and process results.
@@ -163,5 +157,20 @@ public static void insepctDatastoreEntity(
163
157
}
164
158
}
165
159
}
160
+
161
+ // handleMessage injects the job and settableFuture into the message reciever interface
162
+ private static void handleMessage (
163
+ DlpJob job ,
164
+ SettableApiFuture <Boolean > done ,
165
+ PubsubMessage pubsubMessage ,
166
+ AckReplyConsumer ackReplyConsumer ) {
167
+ String messageAttribute = pubsubMessage .getAttributesMap ().get ("DlpJobName" );
168
+ if (job .getName ().equals (messageAttribute )) {
169
+ done .set (true );
170
+ ackReplyConsumer .ack ();
171
+ } else {
172
+ ackReplyConsumer .nack ();
173
+ }
174
+ }
166
175
}
167
176
// [END dlp_inspect_datastore]
0 commit comments