Skip to content

[DLP] Implemented dlp_deidentify_time_extract sample #10235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions dlp/snippets/deid.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,137 @@ def write_data(data: types.storage.Value) -> str:
# [END dlp_deidentify_date_shift]


# [START dlp_deidentify_time_extract]
import csv # noqa: F811, E402, I100
from datetime import datetime # noqa: F811, E402, I100
from typing import List # noqa: F811, E402

import google.cloud.dlp # noqa: F811, E402


def deidentify_with_time_extract(
project: str,
date_fields: List[str],
input_csv_file: str,
output_csv_file: str,
) -> None:
""" Uses the Data Loss Prevention API to deidentify dates in a CSV file through
time part extraction.
Args:
project: The Google Cloud project id to use as a parent resource.
date_fields: A list of (date) fields in CSV file to de-identify
through time extraction. Example: ['birth_date', 'register_date'].
Date values in format: mm/DD/YYYY are considered as part of this
sample.
input_csv_file: The path to the CSV file to deidentify. The first row
of the file must specify column names, and all other rows must
contain valid values.
output_csv_file: The output file path to save the time extracted data.
"""

# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()

# Convert date field list to Protobuf type.
def map_fields(field):
return {"name": field}

if date_fields:
date_fields = map(map_fields, date_fields)
else:
date_fields = []

csv_lines = []
with open(input_csv_file) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
csv_lines.append(row)

# Helper function for converting CSV rows to Protobuf types
def map_headers(header):
return {"name": header}

def map_data(value):
try:
date = datetime.strptime(value, "%m/%d/%Y")
return {
"date_value": {
"year": date.year, "month": date.month, "day": date.day
}
}
except ValueError:
return {"string_value": value}

def map_rows(row):
return {"values": map(map_data, row)}

# Using the helper functions, convert CSV rows to protobuf-compatible
# dictionaries.
csv_headers = map(map_headers, csv_lines[0])
csv_rows = map(map_rows, csv_lines[1:])

# Construct the table dictionary.
table = {"headers": csv_headers, "rows": csv_rows}

# Construct the `item` for table to de-identify.
item = {"table": table}

# Construct deidentify configuration dictionary.
deidentify_config = {
"record_transformations": {
"field_transformations": [
{
"primitive_transformation": {
"time_part_config": {
"part_to_extract": "YEAR"
}
},
"fields": date_fields,
}
]
}
}

# Write to CSV helper methods.
def write_header(header):
return header.name

def write_data(data):
return data.string_value or "{}/{}/{}".format(
data.date_value.month,
data.date_value.day,
data.date_value.year,
)

# Convert the project id into a full resource id.
parent = f"projects/{project}"

# Call the API
response = dlp.deidentify_content(
request={
"parent": parent,
"deidentify_config": deidentify_config,
"item": item,
}
)

# Print the result.
print("Table after de-identification: {}".format(response.item.table))

# Write results to CSV file.
with open(output_csv_file, "w") as csvfile:
write_file = csv.writer(csvfile, delimiter=",")
write_file.writerow(map(write_header, response.item.table.headers))
for row in response.item.table.rows:
write_file.writerow(map(write_data, row.values))

# Print status.
print(f"Successfully saved date-extracted output to {output_csv_file}")


# [END dlp_deidentify_time_extract]


# [START dlp_deidentify_replace_infotype]
from typing import List # noqa: F811, E402, I100

Expand Down Expand Up @@ -2124,6 +2255,30 @@ def deidentify_table_with_multiple_crypto_hash(
"key_name.",
)

time_extract_parser = subparsers.add_parser(
"deid_time_extract",
help="Deidentify dates in a CSV file by extracting a date part.",
)
time_extract_parser.add_argument(
"project",
help="The Google Cloud project id to use as a parent resource.",
)
time_extract_parser.add_argument(
"input_csv_file",
help="The path to the CSV file to deidentify. The first row of the "
"file must specify column names, and all other rows must contain "
"valid values.",
)
time_extract_parser.add_argument(
"date_fields",
nargs="+",
help="The list of date fields in the CSV file to de-identify. Example: "
"['birth_date', 'register_date']",
)
time_extract_parser.add_argument(
"output_csv_file", help="The path to save the time-extracted data."
)

replace_with_infotype_parser = subparsers.add_parser(
"replace_with_infotype",
help="Deidentify sensitive data in a string by replacing it with the "
Expand Down Expand Up @@ -2485,6 +2640,13 @@ def deidentify_table_with_multiple_crypto_hash(
wrapped_key=args.wrapped_key,
key_name=args.key_name,
)
elif args.content == "deid_time_extract":
deidentify_with_time_extract(
args.project,
date_fields=args.date_fields,
input_csv_file=args.input_csv_file,
output_csv_file=args.output_csv_file,
)
elif args.content == "replace_with_infotype":
deidentify_with_replace_infotype(
args.project,
Expand Down
15 changes: 15 additions & 0 deletions dlp/snippets/deid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ def test_deidentify_with_date_shift_using_context_field(
assert "Successful" in out


def test_deidentify_with_time_extract(tempdir: TextIO, capsys: pytest.CaptureFixture) -> None:
output_filepath = os.path.join(str(tempdir), "year-extracted.csv")

deid.deidentify_with_time_extract(
GCLOUD_PROJECT,
input_csv_file=CSV_FILE,
output_csv_file=output_filepath,
date_fields=DATE_FIELDS,
)

out, _ = capsys.readouterr()

assert "Successful" in out


def test_reidentify_with_fpe(capsys: pytest.CaptureFixture) -> None:
labeled_fpe_string = "My SSN is SSN_TOKEN(9):731997681"

Expand Down