Skip to content

Commit 7b347a1

Browse files
minherztelpirion
authored andcommitted
Add log redaction tutorial sample code (#9066)
* doc(logging): add log redaction boilerplate code Create logging/redaction folder to store assets for log redaction tutorial. Assets will include: * documentation (README.md) * boilerplate code to start the tutorial * final version of the code * chore(fix): refactor dataflow fix package imports; add requirements.txt file to allow import google-cloud-logging that is not part of Beam SDK; fix logging_v2 client initialization; refactor some class, function and variable names; * chore(doc): add custom image Dockerfile Add a reference example for custom image Dockerfile * chore(fix): refactor code sample for tutorial Add "final" version of the code for tutorial. Format boilerplate code to keep same quote style and to add placeholder comments for code addition. * chore(fix): adding redaction step to pipeline Execute redaction function before sending logs in the pipeline. Refactoring argument names. * chore: fix compilation error * chore: comment text correction * chore: update codeowners * chore: change indent size from 2 to 4 * chore: fix lint errors * chore: minor text fix * chore: updating DEE observability ownership * chore: address style comments from dandhlee@
1 parent b180b17 commit 7b347a1

File tree

7 files changed

+401
-3
lines changed

7 files changed

+401
-3
lines changed

.github/CODEOWNERS

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,13 @@
6262
/kubernetes_engine/**/* @GoogleCloudPlatform/python-samples-reviewers
6363
/kubernetes_engine/django_tutorial/**/* @glasnt @GoogleCloudPlatform/python-samples-reviewers
6464
/language/**/* @GoogleCloudPlatform/dee-data-ai @GoogleCloudPlatform/python-samples-reviewers
65+
/logging/**/* @GoogleCloudPlatform/dee-observability @GoogleCloudPlatform/python-samples-reviewers
6566
/media_cdn/**/* @justin-mp @msampathkumar @GoogleCloudPlatform/python-samples-reviewers
6667
/memorystore/**/* @GoogleCloudPlatform/python-samples-reviewers
6768
/ml_engine/**/* @ivanmkc @GoogleCloudPlatform/python-samples-reviewers
6869
/monitoring/**/* @GoogleCloudPlatform/dee-observability @GoogleCloudPlatform/python-samples-reviewers
69-
/monitoring/opencensus @yuriatgoogle @GoogleCloudPlatform/python-samples-reviewers
70-
/monitoring/prometheus @yuriatgoogle @GoogleCloudPlatform/python-samples-reviewers
70+
/monitoring/opencensus @yuriatgoogle @GoogleCloudPlatform/dee-observability @GoogleCloudPlatform/python-samples-reviewers
71+
/monitoring/prometheus @yuriatgoogle @GoogleCloudPlatform/dee-observability @GoogleCloudPlatform/python-samples-reviewers
7172
/notebooks/**/* @alixhami @GoogleCloudPlatform/python-samples-reviewers
7273
/optimization/**/* @GoogleCloudPlatform/dee-data-ai @GoogleCloudPlatform/python-samples-reviewers
7374
/opencensus/**/* @GoogleCloudPlatform/python-samples-reviewers
@@ -83,7 +84,7 @@
8384
/storage/**/* @GoogleCloudPlatform/cloud-storage-dpes @GoogleCloudPlatform/python-samples-reviewers
8485
/storagetransfer/**/* @GoogleCloudPlatform/cloud-storage-dpes @GoogleCloudPlatform/python-samples-reviewers
8586
/texttospeech/**/* @GoogleCloudPlatform/dee-data-ai @GoogleCloudPlatform/python-samples-reviewers
86-
/trace/**/* @ymotongpoo @GoogleCloudPlatform/python-samples-reviewers
87+
/trace/**/* @ymotongpoo @GoogleCloudPlatform/dee-observability @GoogleCloudPlatform/python-samples-reviewers
8788
/translate/**/* @nicain @GoogleCloudPlatform/python-samples-reviewers
8889
/talent/**/* @GoogleCloudPlatform/python-samples-reviewers
8990
/vision/**/* @GoogleCloudPlatform/python-samples-reviewers

.github/blunderbuss.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ assign_issues_by:
144144
- GoogleCloudPlatform/python-samples-reviewers
145145
- ivanmkc
146146
- labels:
147+
- 'api: logging'
147148
- 'api: monitoring'
149+
- 'api: trace'
148150
to:
149151
- GoogleCloudPlatform/dee-observability
150152
- labels:

logging/redaction/Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# From apache/beam_python3.9_sdk:2.43.0
2+
FROM apache/beam_python3.9_sdk@sha256:372abe1d342447118d517ed1325969115e2df03f09fc7604e606c7552380b2ce
3+
4+
# Install google-cloud-logging package that is missing in Beam SDK
5+
COPY requirements.txt /tmp
6+
RUN pip3 install --upgrade pip && pip3 install -r /tmp/requirements.txt && pip3 check

logging/redaction/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Logging redaction tutorial code samples
2+
3+
> **Warning**
4+
> This section is still **W**ork **I**n **P**rogress.
5+
> Cloud Shell button now opens this README. It will open the tutorial _AFTER_ its official launch.
6+
> Tests to validate the code samples will be added.
7+
8+
This section contains code that is used in the "Redact confidential information in logs" tutorial.
9+
You can open the tutorial in Cloud Shell:
10+
11+
[![Open in Cloud Shell][shell_img]][shell_link]
12+
13+
_NOTE:_ You will need a valid Google Cloud credentials to open the tutorial.
14+
15+
[shell_img]: http://gstatic.com/cloudssh/images/open-btn.png
16+
[shell_link]: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=logging/redaction/README.md

logging/redaction/log_redaction.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import argparse
17+
import json
18+
import logging
19+
from typing import List
20+
21+
from apache_beam import CombineFn, CombineGlobally, DoFn, io, ParDo, Pipeline, WindowInto
22+
from apache_beam.error import PipelineError
23+
from apache_beam.options.pipeline_options import PipelineOptions
24+
from apache_beam.transforms.window import FixedWindows
25+
26+
from google.cloud import logging_v2
27+
28+
29+
# TODO: Place inspection and de-identification configurations
30+
31+
class PayloadAsJson(DoFn):
32+
'''Convert PubSub message payload to UTF-8 and return as JSON'''
33+
def process(self, element):
34+
yield json.loads(element.decode('utf-8'))
35+
36+
37+
class BatchPayloads(CombineFn):
38+
'''Opinionated way to batch all payloads in the window'''
39+
40+
def create_accumulator(self):
41+
return []
42+
43+
def add_input(self, accumulator, input):
44+
accumulator.append(input)
45+
return accumulator
46+
47+
def merge_accumulators(self, accumulators):
48+
merged = [
49+
item
50+
for accumulator in accumulators
51+
for item in accumulator
52+
]
53+
return merged
54+
55+
def extract_output(self, accumulator):
56+
return accumulator
57+
58+
59+
# TODO: Placeholder for LogRedaction class
60+
61+
62+
class IngestLogs(DoFn):
63+
'''Ingest payloads into destination log'''
64+
65+
def __init__(self, destination_log_name):
66+
self.destination_log_name = destination_log_name
67+
self.logger = None
68+
69+
def _replace_log_name(self, entry):
70+
# update log name in the entry with destination log
71+
entry['logName'] = self.logger.name
72+
return entry
73+
74+
def setup(self):
75+
# initialize logging client
76+
if self.logger:
77+
return
78+
79+
logging_client = logging_v2.Client()
80+
if not logging_client:
81+
logging.error('Cannot create GCP Logging Client')
82+
raise PipelineError('Cannot create GCP Logging Client')
83+
self.logger = logging_client.logger(self.destination_log_name)
84+
if not self.logger:
85+
logging.error('Google client library cannot create Logger object')
86+
raise PipelineError('Google client library cannot create Logger object')
87+
88+
def process(self, element):
89+
if self.logger:
90+
logs = list(map(self._replace_log_name, element))
91+
self.logger.client.logging_api.write_entries(logs)
92+
yield logs
93+
94+
95+
def run(
96+
pubsub_subscription: str,
97+
destination_log_name: str,
98+
window_size: float,
99+
pipeline_args: List[str] = None
100+
) -> None:
101+
'''Runs Dataflow pipeline'''
102+
103+
pipeline_options = PipelineOptions(
104+
pipeline_args,
105+
streaming=True,
106+
save_main_session=True
107+
)
108+
pipeline = Pipeline(options=pipeline_options)
109+
_ = (
110+
pipeline
111+
| 'Read log entries from Pub/Sub' >> io.ReadFromPubSub(subscription=pubsub_subscription)
112+
| 'Convert log entry payload to Json' >> ParDo(PayloadAsJson())
113+
| 'Aggregate payloads in fixed time intervals' >> WindowInto(FixedWindows(window_size))
114+
# Optimize Google API consumption and avoid possible throttling
115+
# by calling APIs for batched data and not per each element
116+
| 'Batch aggregated payloads' >> CombineGlobally(BatchPayloads()).without_defaults()
117+
# TODO: Placeholder for redaction transformation
118+
| 'Ingest to output log' >> ParDo(IngestLogs(destination_log_name))
119+
)
120+
pipeline.run()
121+
122+
123+
if __name__ == '__main__':
124+
logging.getLogger().setLevel(logging.INFO)
125+
126+
parser = argparse.ArgumentParser()
127+
parser.add_argument(
128+
'--pubsub_subscription',
129+
help='The Cloud Pub/Sub subscription to read from in the format '
130+
'"projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>".',
131+
)
132+
parser.add_argument(
133+
'--destination_log_name',
134+
help='The log name to ingest log entries in the format '
135+
'"projects/<PROJECT_ID>/logs/<LOG_ID>".',
136+
)
137+
parser.add_argument(
138+
'--window_size',
139+
type=float,
140+
default=60.0,
141+
help='Output file\'s window size in seconds.',
142+
)
143+
known_args, pipeline_args = parser.parse_known_args()
144+
145+
run(
146+
known_args.pubsub_subscription,
147+
known_args.destination_log_name,
148+
known_args.window_size,
149+
pipeline_args,
150+
)

0 commit comments

Comments
 (0)