-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Support for prowler scan #12449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Support for prowler scan #12449
Conversation
- Add test_mode parameter to avoid database operations during tests - Improve CSV parser to handle both comma and semicolon delimiters - Enhance JSON parsing to extract fields from multiple possible locations - Fix sequence of operations to ensure findings are saved before setting notes - Add safe handling for provider values to prevent NoneType errors - Support all cloud providers (AWS, Azure, GCP, Kubernetes) in both CSV and JSON formats - Store notes content in unsaved_notes during test mode
1. Sample scan files for AWS, Azure, GCP, and Kubernetes in both CSV and JSON formats - Added to unittests/scans/prowler/ to cover all supported cloud providers - Files represent real-world scan outputs with typical findings 2. Enhanced test_prowler_parser.py - Added tests for file-based parsing of all cloud providers and formats - Ensured verification of key fields (title, severity, notes, etc.) 3. Added test_prowler_stringio.py - Implemented in-memory tests using StringIO to avoid file I/O - Tests both JSON and CSV parsing for all cloud providers - Verifies correct processing of unique fields per provider - Tests specific edge cases like delimiter detection and field extraction
This pull request contains potential security risks, including example security scan files with placeholders that could expose sensitive information and a file parser vulnerability that might enable a Denial of Service attack by processing large files without proper size validation.
|
Vulnerability | Information Disclosure via Example Security Scan Data |
---|---|
Description | Multiple example security scan output files contain detailed placeholders that, if accidentally populated with real data, could expose sensitive organizational information about cloud infrastructure, security configurations, and potential vulnerabilities. |
django-DefectDojo/unittests/scans/prowler/examples/output/example_output_aws.ocsf.json
Lines 1 to 625 in 46d5d33
[ | |
{ | |
"message": "IAM Access Analyzer in account <account_uid> is not enabled.", | |
"metadata": { | |
"event_code": "accessanalyzer_enabled", | |
"product": { | |
"name": "Prowler", | |
"uid": "prowler", | |
"vendor_name": "Prowler", | |
"version": "<prowler_version>" | |
}, | |
"profiles": [ | |
"cloud", | |
"datetime" | |
], | |
"tenant_uid": "", | |
"version": "1.4.0" | |
}, | |
"severity_id": 2, | |
"severity": "Low", | |
"status": "New", | |
"status_code": "FAIL", | |
"status_detail": "IAM Access Analyzer in account <account_uid> is not enabled.", | |
"status_id": 1, | |
"unmapped": { | |
"related_url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/what-is-access-analyzer.html", | |
"categories": [], | |
"depends_on": [], | |
"related_to": [], | |
"notes": "", | |
"compliance": { | |
"CIS-1.4": [ | |
"1.20" | |
], | |
"CIS-1.5": [ | |
"1.20" | |
], | |
"KISA-ISMS-P-2023": [ | |
"2.5.6", | |
"2.6.4", | |
"2.8.1", | |
"2.8.2" | |
], | |
"CIS-2.0": [ | |
"1.20" | |
], | |
"KISA-ISMS-P-2023-korean": [ | |
"2.5.6", | |
"2.6.4", | |
"2.8.1", | |
"2.8.2" | |
], | |
"AWS-Account-Security-Onboarding": [ | |
"Enabled security services", | |
"Create analyzers in each active regions", | |
"Verify that events are present in SecurityHub aggregated view" | |
], | |
"CIS-3.0": [ | |
"1.20" | |
] | |
} | |
}, | |
"activity_name": "Create", | |
"activity_id": 1, | |
"finding_info": { | |
"created_time": 1739539623, | |
"created_time_dt": "2025-02-14T14:27:03.913874", | |
"desc": "Check if IAM Access Analyzer is enabled", | |
"product_uid": "prowler", | |
"title": "Check if IAM Access Analyzer is enabled", | |
"types": [ | |
"IAM" | |
], | |
"uid": "<finding_uid>" | |
}, | |
"resources": [ | |
{ | |
"cloud_partition": "aws", | |
"region": "<region>", | |
"data": { | |
"details": "", | |
"metadata": { | |
"arn": "<resource_arn>", | |
"name": "<resource_name>", | |
"status": "NOT_AVAILABLE", | |
"findings": [], | |
"tags": [], | |
"type": "", | |
"region": "<region>" | |
} | |
}, | |
"group": { | |
"name": "accessanalyzer" | |
}, | |
"labels": [], | |
"name": "<resource_name>", | |
"type": "Other", | |
"uid": "<resource_uid>" | |
} | |
], | |
"category_name": "Findings", | |
"category_uid": 2, | |
"class_name": "Detection Finding", | |
"class_uid": 2004, | |
"cloud": { | |
"account": { | |
"name": "", | |
"type": "AWS Account", | |
"type_id": 10, | |
"uid": "<account_uid>", | |
"labels": [] | |
}, | |
"org": { | |
"name": "", | |
"uid": "" | |
}, | |
"provider": "aws", | |
"region": "<region>" | |
}, | |
"remediation": { | |
"desc": "Enable IAM Access Analyzer for all accounts, create analyzer and take action over it is recommendations (IAM Access Analyzer is available at no additional cost).", | |
"references": [ | |
"aws accessanalyzer create-analyzer --analyzer-name <NAME> --type <ACCOUNT|ORGANIZATION>", | |
"https://docs.aws.amazon.com/IAM/latest/UserGuide/what-is-access-analyzer.html" | |
] | |
}, | |
"risk_details": "AWS IAM Access Analyzer helps you identify the resources in your organization and accounts, such as Amazon S3 buckets or IAM roles, that are shared with an external entity. This lets you identify unintended access to your resources and data, which is a security risk. IAM Access Analyzer uses a form of mathematical analysis called automated reasoning, which applies logic and mathematical inference to determine all possible access paths allowed by a resource policy.", | |
"time": 1739539623, | |
"time_dt": "2025-02-14T14:27:03.913874", | |
"type_uid": 200401, | |
"type_name": "Detection Finding: Create" | |
}, | |
{ | |
"message": "Login to the AWS Console. Choose your account name on the top right of the window -> My Account -> Contact Information.", | |
"metadata": { | |
"event_code": "account_maintain_current_contact_details", | |
"product": { | |
"name": "Prowler", | |
"uid": "prowler", | |
"vendor_name": "Prowler", | |
"version": "<prowler_version>" | |
}, | |
"profiles": [ | |
"cloud", | |
"datetime" | |
], | |
"tenant_uid": "", | |
"version": "1.4.0" | |
}, | |
"severity_id": 3, | |
"severity": "Medium", | |
"status": "New", | |
"status_code": "MANUAL", | |
"status_detail": "Login to the AWS Console. Choose your account name on the top right of the window -> My Account -> Contact Information.", | |
"status_id": 1, | |
"unmapped": { | |
"related_url": "", | |
"categories": [], | |
"depends_on": [], | |
"related_to": [], | |
"notes": "", | |
"compliance": { | |
"CIS-1.4": [ | |
"1.1" | |
], | |
"CIS-1.5": [ | |
"1.1" | |
], | |
"KISA-ISMS-P-2023": [ | |
"2.1.3" | |
], | |
"CIS-2.0": [ | |
"1.1" | |
], | |
"KISA-ISMS-P-2023-korean": [ | |
"2.1.3" | |
], | |
"AWS-Well-Architected-Framework-Security-Pillar": [ | |
"SEC03-BP03", | |
"SEC10-BP01" | |
], | |
"AWS-Account-Security-Onboarding": [ | |
"Billing, emergency, security contacts" | |
], | |
"CIS-3.0": [ | |
"1.1" | |
], | |
"ENS-RD2022": [ | |
"op.ext.7.aws.am.1" | |
] | |
} | |
}, | |
"activity_name": "Create", | |
"activity_id": 1, | |
"finding_info": { | |
"created_time": 1739539623, | |
"created_time_dt": "2025-02-14T14:27:03.913874", | |
"desc": "Maintain current contact details.", | |
"product_uid": "prowler", | |
"title": "Maintain current contact details.", | |
"types": [ | |
"IAM" | |
], | |
"uid": "<finding_uid>" | |
}, | |
"resources": [ | |
{ | |
"cloud_partition": "aws", | |
"region": "<region>", | |
"data": { | |
"details": "", | |
"metadata": { | |
"type": "PRIMARY", | |
"email": null, | |
"name": "<account_name>", | |
"phone_number": "<value>" | |
} | |
}, | |
"group": { | |
"name": "account" | |
}, | |
"labels": [], | |
"name": "<account_uid>", | |
"type": "Other", | |
"uid": "arn:aws:iam::<account_uid>:root" | |
} | |
], | |
"category_name": "Findings", | |
"category_uid": 2, | |
"class_name": "Detection Finding", | |
"class_uid": 2004, | |
"cloud": { | |
"account": { | |
"name": "", | |
"type": "AWS Account", | |
"type_id": 10, | |
"uid": "<account_uid>", | |
"labels": [] | |
}, | |
"org": { | |
"name": "", | |
"uid": "" | |
}, | |
"provider": "aws", | |
"region": "<region>" | |
}, | |
"remediation": { | |
"desc": "Using the Billing and Cost Management console complete contact details.", | |
"references": [ | |
"No command available.", | |
"https://docs.prowler.com/checks/aws/iam-policies/iam_18-maintain-contact-details#aws-console", | |
"https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-update-contact.html" | |
] | |
}, | |
"risk_details": "Ensure contact email and telephone details for AWS accounts are current and map to more than one individual in your organization. An AWS account supports a number of contact details, and AWS will use these to contact the account owner if activity judged to be in breach of Acceptable Use Policy. If an AWS account is observed to be behaving in a prohibited or suspicious manner, AWS will attempt to contact the account owner by email and phone using the contact details listed. If this is unsuccessful and the account behavior needs urgent mitigation, proactive measures may be taken, including throttling of traffic between the account exhibiting suspicious behavior and the AWS API endpoints and the Internet. This will result in impaired service to and from the account in question.", | |
"time": 1739539623, | |
"time_dt": "2025-02-14T14:27:03.913874", | |
"type_uid": 200401, | |
"type_name": "Detection Finding: Create" | |
}, | |
{ | |
"message": "SECURITY, BILLING and OPERATIONS contacts not found or they are not different between each other and between ROOT contact.", | |
"metadata": { | |
"event_code": "account_maintain_different_contact_details_to_security_billing_and_operations", | |
"product": { | |
"name": "Prowler", | |
"uid": "prowler", | |
"vendor_name": "Prowler", | |
"version": "<prowler_version>" | |
}, | |
"profiles": [ | |
"cloud", | |
"datetime" | |
], | |
"tenant_uid": "", | |
"version": "1.4.0" | |
}, | |
"severity_id": 3, | |
"severity": "Medium", | |
"status": "New", | |
"status_code": "FAIL", | |
"status_detail": "SECURITY, BILLING and OPERATIONS contacts not found or they are not different between each other and between ROOT contact.", | |
"status_id": 1, | |
"unmapped": { | |
"related_url": "https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-update-contact.html", | |
"categories": [], | |
"depends_on": [], | |
"related_to": [], | |
"notes": "", | |
"compliance": { | |
"KISA-ISMS-P-2023": [ | |
"2.1.3" | |
], | |
"KISA-ISMS-P-2023-korean": [ | |
"2.1.3" | |
] | |
} | |
}, | |
"activity_name": "Create", | |
"activity_id": 1, | |
"finding_info": { | |
"created_time": 1739539623, | |
"created_time_dt": "2025-02-14T14:27:03.913874", | |
"desc": "Maintain different contact details to security, billing and operations.", | |
"product_uid": "prowler", | |
"title": "Maintain different contact details to security, billing and operations.", | |
"types": [ | |
"IAM" | |
], | |
"uid": "<finding_uid>" | |
}, | |
"resources": [ | |
{ | |
"cloud_partition": "aws", | |
"region": "<region>", | |
"data": { | |
"details": "", | |
"metadata": { | |
"type": "PRIMARY", | |
"email": null, | |
"name": "<account_name>", | |
"phone_number": "<value>" | |
} | |
}, | |
"group": { | |
"name": "account" | |
}, | |
"labels": [], | |
"name": "<account_uid>", | |
"type": "Other", | |
"uid": "arn:aws:iam::<account_uid>:root" | |
} | |
], | |
"category_name": "Findings", | |
"category_uid": 2, | |
"class_name": "Detection Finding", | |
"class_uid": 2004, | |
"cloud": { | |
"account": { | |
"name": "", | |
"type": "AWS Account", | |
"type_id": 10, | |
"uid": "<account_uid>", | |
"labels": [] | |
}, | |
"org": { | |
"name": "", | |
"uid": "" | |
}, | |
"provider": "aws", | |
"region": "<region>" | |
}, | |
"remediation": { | |
"desc": "Using the Billing and Cost Management console complete contact details.", | |
"references": [ | |
"https://docs.prowler.com/checks/aws/iam-policies/iam_18-maintain-contact-details#aws-console", | |
"https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-update-contact.html" | |
] | |
}, | |
"risk_details": "Ensure contact email and telephone details for AWS accounts are current and map to more than one individual in your organization. An AWS account supports a number of contact details, and AWS will use these to contact the account owner if activity judged to be in breach of Acceptable Use Policy. If an AWS account is observed to be behaving in a prohibited or suspicious manner, AWS will attempt to contact the account owner by email and phone using the contact details listed. If this is unsuccessful and the account behavior needs urgent mitigation, proactive measures may be taken, including throttling of traffic between the account exhibiting suspicious behavior and the AWS API endpoints and the Internet. This will result in impaired service to and from the account in question.", | |
"time": 1739539623, | |
"time_dt": "2025-02-14T14:27:03.913874", | |
"type_uid": 200401, | |
"type_name": "Detection Finding: Create" | |
}, | |
{ | |
"message": "Login to the AWS Console. Choose your account name on the top right of the window -> My Account -> Alternate Contacts -> Security Section.", | |
"metadata": { | |
"event_code": "account_security_contact_information_is_registered", | |
"product": { | |
"name": "Prowler", | |
"uid": "prowler", | |
"vendor_name": "Prowler", | |
"version": "<prowler_version>" | |
}, | |
"profiles": [ | |
"cloud", | |
"datetime" | |
], | |
"tenant_uid": "", | |
"version": "1.4.0" | |
}, | |
"severity_id": 3, | |
"severity": "Medium", | |
"status": "New", | |
"status_code": "MANUAL", | |
"status_detail": "Login to the AWS Console. Choose your account name on the top right of the window -> My Account -> Alternate Contacts -> Security Section.", | |
"status_id": 1, | |
"unmapped": { | |
"related_url": "", | |
"categories": [], | |
"depends_on": [], | |
"related_to": [], | |
"notes": "", | |
"compliance": { | |
"CIS-1.4": [ | |
"1.2" | |
], | |
"CIS-1.5": [ | |
"1.2" | |
], | |
"AWS-Foundational-Security-Best-Practices": [ | |
"account", | |
"acm" | |
], | |
"KISA-ISMS-P-2023": [ | |
"2.1.3", | |
"2.2.1" | |
], | |
"CIS-2.0": [ | |
"1.2" | |
], | |
"KISA-ISMS-P-2023-korean": [ | |
"2.1.3", | |
"2.2.1" | |
], | |
"AWS-Well-Architected-Framework-Security-Pillar": [ | |
"SEC03-BP03", | |
"SEC10-BP01" | |
], | |
"AWS-Account-Security-Onboarding": [ | |
"Billing, emergency, security contacts" | |
], | |
"CIS-3.0": [ | |
"1.2" | |
], | |
"ENS-RD2022": [ | |
"op.ext.7.aws.am.1" | |
] | |
} | |
}, | |
"activity_name": "Create", | |
"activity_id": 1, | |
"finding_info": { | |
"created_time": 1739539623, | |
"created_time_dt": "2025-02-14T14:27:03.913874", | |
"desc": "Ensure security contact information is registered.", | |
"product_uid": "prowler", | |
"title": "Ensure security contact information is registered.", | |
"types": [ | |
"IAM" | |
], | |
"uid": "<finding_uid>" | |
}, | |
"resources": [ | |
{ | |
"cloud_partition": "aws", | |
"region": "<region>", | |
"data": { | |
"details": "", | |
"metadata": { | |
"type": "PRIMARY", | |
"email": null, | |
"name": "<account_name>", | |
"phone_number": "<value>" | |
} | |
}, | |
"group": { | |
"name": "account" | |
}, | |
"labels": [], | |
"name": "<account_uid>", | |
"type": "Other", | |
"uid": "arn:aws:iam::<account_uid>:root" | |
} | |
], | |
"category_name": "Findings", | |
"category_uid": 2, | |
"class_name": "Detection Finding", | |
"class_uid": 2004, | |
"cloud": { | |
"account": { | |
"name": "", | |
"type": "AWS Account", | |
"type_id": 10, | |
"uid": "<account_uid>", | |
"labels": [] | |
}, | |
"org": { | |
"name": "", | |
"uid": "" | |
}, | |
"provider": "aws", | |
"region": "<region>" | |
}, | |
"remediation": { | |
"desc": "Go to the My Account section and complete alternate contacts.", | |
"references": [ | |
"No command available.", | |
"https://docs.prowler.com/checks/aws/iam-policies/iam_19#aws-console", | |
"https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-update-contact.html" | |
] | |
}, | |
"risk_details": "AWS provides customers with the option of specifying the contact information for accounts security team. It is recommended that this information be provided. Specifying security-specific contact information will help ensure that security advisories sent by AWS reach the team in your organization that is best equipped to respond to them.", | |
"time": 1739539623, | |
"time_dt": "2025-02-14T14:27:03.913874", | |
"type_uid": 200401, | |
"type_name": "Detection Finding: Create" | |
}, | |
{ | |
"message": "Login to the AWS Console as root. Choose your account name on the top right of the window -> My Account -> Configure Security Challenge Questions.", | |
"metadata": { | |
"event_code": "account_security_questions_are_registered_in_the_aws_account", | |
"product": { | |
"name": "Prowler", | |
"uid": "prowler", | |
"vendor_name": "Prowler", | |
"version": "<prowler_version>" | |
}, | |
"profiles": [ | |
"cloud", | |
"datetime" | |
], | |
"tenant_uid": "", | |
"version": "1.4.0" | |
}, | |
"severity_id": 3, | |
"severity": "Medium", | |
"status": "New", | |
"status_code": "MANUAL", | |
"status_detail": "Login to the AWS Console as root. Choose your account name on the top right of the window -> My Account -> Configure Security Challenge Questions.", | |
"status_id": 1, | |
"unmapped": { | |
"related_url": "", | |
"categories": [], | |
"depends_on": [], | |
"related_to": [], | |
"notes": "", | |
"compliance": { | |
"CIS-1.4": [ | |
"1.3" | |
], | |
"CIS-1.5": [ | |
"1.3" | |
], | |
"KISA-ISMS-P-2023": [ | |
"2.1.3" | |
], | |
"CIS-2.0": [ | |
"1.3" | |
], | |
"KISA-ISMS-P-2023-korean": [ | |
"2.1.3" | |
], | |
"AWS-Well-Architected-Framework-Security-Pillar": [ | |
"SEC03-BP03", | |
"SEC10-BP01" | |
], | |
"CIS-3.0": [ | |
"1.3" | |
], | |
"ENS-RD2022": [ | |
"op.ext.7.aws.am.1" | |
] | |
} | |
}, | |
"activity_name": "Create", | |
"activity_id": 1, | |
"finding_info": { | |
"created_time": 1739539623, | |
"created_time_dt": "2025-02-14T14:27:03.913874", | |
"desc": "Ensure security questions are registered in the AWS account.", | |
"product_uid": "prowler", | |
"title": "Ensure security questions are registered in the AWS account.", | |
"types": [ | |
"IAM" | |
], | |
"uid": "<finding_uid>" | |
}, | |
"resources": [ | |
{ | |
"cloud_partition": "aws", | |
"region": "<region>", | |
"data": { | |
"details": "", | |
"metadata": { | |
"type": "SECURITY", | |
"email": null, | |
"name": null, | |
"phone_number": null | |
} | |
}, | |
"group": { | |
"name": "account" | |
}, | |
"labels": [], | |
"name": "<account_uid>", | |
"type": "Other", | |
"uid": "arn:aws:iam::<account_uid>:root" | |
} | |
], | |
"category_name": "Findings", | |
"category_uid": 2, | |
"class_name": "Detection Finding", | |
"class_uid": 2004, | |
"cloud": { | |
"account": { | |
"name": "", | |
"type": "AWS Account", | |
"type_id": 10, | |
"uid": "<account_uid>", | |
"labels": [] | |
}, | |
"org": { | |
"name": "", | |
"uid": "" | |
}, | |
"provider": "aws", | |
"region": "<region>" | |
}, | |
"remediation": { | |
"desc": "Login as root account and from My Account configure Security questions.", | |
"references": [ | |
"No command available.", | |
"https://docs.prowler.com/checks/aws/iam-policies/iam_15", | |
"https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-security-challenge.html" | |
] | |
}, | |
"risk_details": "The AWS support portal allows account owners to establish security questions that can be used to authenticate individuals calling AWS customer service for support. It is recommended that security questions be established. When creating a new AWS account a default super user is automatically created. This account is referred to as the root account. It is recommended that the use of this account be limited and highly controlled. During events in which the root password is no longer accessible or the MFA token associated with root is lost/destroyed it is possible through authentication using secret questions and associated answers to recover root login access.", | |
"time": 1739539623, | |
"time_dt": "2025-02-14T14:27:03.913874", | |
"type_uid": 200401, | |
"type_name": "Detection Finding: Create" | |
} | |
] |
⚠️ Potential File Processing DoS in dojo/tools/prowler/parser.py
Vulnerability | Potential File Processing DoS |
---|---|
Description | The parser reads entire file contents into memory without size validation, which could enable a Denial of Service attack by uploading extremely large files. This could consume excessive memory or processing resources. |
django-DefectDojo/dojo/tools/prowler/parser.py
Lines 1 to 414 in 46d5d33
import csv | |
import json | |
import logging | |
from io import StringIO | |
from dojo.models import Finding | |
logger = logging.getLogger(__name__) | |
class ProwlerParser: | |
""" | |
A parser for Prowler scan results. | |
Supports both CSV and OCSF JSON for AWS, Azure, GCP, and Kubernetes. | |
""" | |
def get_scan_types(self): | |
return ["Prowler Scan"] | |
def get_label_for_scan_types(self, scan_type): | |
return "Prowler Scan" | |
def get_description_for_scan_types(self, scan_type): | |
return "Import Prowler scan results in CSV or OCSF JSON format. Supports AWS, Azure, GCP, and Kubernetes scans." | |
def get_findings(self, file, test): | |
"""Parses the Prowler scan results file (CSV or JSON) and returns a list of findings.""" | |
content = file.read() | |
file.seek(0) | |
if isinstance(content, bytes): | |
content = content.decode("utf-8") | |
# Get file name/path to determine file type | |
file_name = getattr(file, "name", "") | |
# Determine file type based on extension | |
if file_name.lower().endswith(".json"): | |
data = self._parse_json(content) | |
findings = self._parse_json_findings(data, test, file_name=file_name) | |
elif file_name.lower().endswith(".csv"): | |
csv_data = self._parse_csv(content) | |
findings = self._parse_csv_findings(csv_data, test, file_name=file_name) | |
else: | |
# If file type can't be determined from extension | |
error_msg = f"Unsupported file format. Prowler parser only supports JSON and CSV files. File name: {file_name}" | |
logger.error(f"Unsupported file format for Prowler parser: {file_name}") | |
raise ValueError(error_msg) | |
return findings | |
def _parse_json(self, content): | |
"""Safely parse JSON content""" | |
return json.loads(content) | |
def _parse_csv(self, content): | |
"""Parse CSV content""" | |
f = StringIO(content) | |
csv_reader = csv.DictReader(f, delimiter=";") | |
results = list(csv_reader) | |
# If we got empty or mostly empty results, try with comma delimiter | |
if len(results) == 0 or (len(results) > 0 and all(len(row) <= 3 for row in results)): | |
f = StringIO(content) | |
csv_reader = csv.DictReader(f, delimiter=",") | |
results = list(csv_reader) | |
return results | |
def _determine_severity(self, severity_str): | |
"""Maps Prowler severity to DefectDojo severity""" | |
severity_map = { | |
"critical": "Critical", | |
"high": "High", | |
"medium": "Medium", | |
"low": "Low", | |
"informational": "Info", | |
"info": "Info", | |
} | |
# Convert to lowercase for case-insensitive matching | |
severity_str = severity_str.lower() if severity_str else "" | |
return severity_map.get(severity_str, "Medium") | |
def _determine_active_status(self, status_code): | |
"""Determine if the finding is active based on its status""" | |
if not status_code: | |
return True | |
# Using a set for O(1) lookup performance | |
inactive_statuses = {"pass", "manual", "not_available", "skipped"} | |
return status_code.lower() not in inactive_statuses | |
def _parse_json_findings(self, data, test, *, file_name=""): | |
"""Parse findings from the OCSF JSON format""" | |
findings = [] | |
for item in data: | |
# Skip items without required fields | |
if not isinstance(item, dict): | |
logger.debug(f"Skipping Prowler finding because it's not a dict: {item}") | |
continue | |
# Get basic information | |
title = item.get("message", "No title provided") | |
description = item.get("risk_details", "") | |
# Get severity - look in multiple possible locations | |
severity_str = None | |
if "severity" in item: | |
severity_str = item.get("severity") | |
elif ( | |
"finding_info" in item and isinstance(item["finding_info"], dict) and "severity" in item["finding_info"] | |
): | |
severity_str = item["finding_info"]["severity"] | |
elif "severity_id" in item: | |
severity_id = item.get("severity_id") | |
# Map severity ID to string | |
if severity_id == 5: | |
severity_str = "Critical" | |
elif severity_id == 4: | |
severity_str = "High" | |
elif severity_id == 3: | |
severity_str = "Medium" | |
elif severity_id == 2: | |
severity_str = "Low" | |
else: | |
severity_str = "Info" | |
severity = self._determine_severity(severity_str) | |
# Determine if finding is active based on status | |
status_code = item.get("status_code", "") | |
active = self._determine_active_status(status_code) | |
# Get additional metadata | |
cloud_provider = None | |
resource_type = None | |
resource_name = None | |
region = "" | |
# Get cloud provider from cloud object if available | |
if "cloud" in item and isinstance(item["cloud"], dict): | |
if "provider" in item["cloud"]: | |
cloud_provider = item["cloud"]["provider"] | |
region = item["cloud"].get("region", "") | |
# Get resource information from resources array if available | |
if "resources" in item and isinstance(item["resources"], list) and item["resources"]: | |
resource = item["resources"][0] | |
resource_type = resource.get("type", "") | |
resource_name = resource.get("name", "") | |
# Set unique ID from finding info | |
unique_id = None | |
if "finding_info" in item and isinstance(item["finding_info"], dict): | |
unique_id = item["finding_info"].get("uid", "") | |
# Get check ID - simplify extraction logic | |
check_id = None | |
# Try to get check_id from finding_info first (some formats) | |
if "finding_info" in item and isinstance(item["finding_info"], dict): | |
check_id = item["finding_info"].get("check_id") | |
# Fall back to top-level check_id if not found in finding_info | |
if not check_id and "check_id" in item: | |
check_id = item.get("check_id") | |
# For official Prowler OCSF JSON format, check_id is in metadata.event_code | |
if not check_id and "metadata" in item and isinstance(item["metadata"], dict): | |
check_id = item["metadata"].get("event_code") | |
# Get remediation information | |
remediation = "" | |
if "remediation" in item and isinstance(item["remediation"], dict): | |
# Try to get remediation - prefer "text" field but fall back to "desc" if needed | |
remediation = item["remediation"].get("text", item["remediation"].get("desc", "")) | |
# Add notes to description | |
if status_code: | |
notes = f"Status: {status_code}\n" | |
if "status_detail" in item: | |
notes += f"Status Detail: {item['status_detail']}\n" | |
# Add notes to description | |
if notes.strip() and description: | |
description += f"\n\n{notes}" | |
elif notes.strip(): | |
description = notes | |
# Create finding | |
finding = Finding( | |
title=title, | |
test=test, | |
description=description, | |
severity=severity, | |
active=active, | |
verified=False, | |
static_finding=True, | |
dynamic_finding=False, | |
unique_id_from_tool=unique_id, | |
) | |
# Add additional metadata | |
finding.unsaved_tags = [] | |
# Extract date if available | |
if "finding_info" in item and isinstance(item["finding_info"], dict) and "created_time_dt" in item["finding_info"]: | |
finding.date = item["finding_info"]["created_time_dt"] | |
# Add cloud provider as tag if available | |
if cloud_provider: | |
finding.unsaved_tags.append(cloud_provider) | |
# If no cloud provider but we can infer it from check_id or title | |
elif check_id and any(prefix in check_id.lower() for prefix in ["iam_", "elb_", "ec2_", "s3_"]): | |
finding.unsaved_tags.append("aws") | |
elif "azure" in title.lower() or ( | |
check_id and any(prefix in check_id.lower() for prefix in ["aks_", "aad_"]) | |
): | |
finding.unsaved_tags.append("azure") | |
elif "gcp" in title.lower() or ( | |
check_id and any(prefix in check_id.lower() for prefix in ["gcp_", "gke_"]) | |
): | |
finding.unsaved_tags.append("gcp") | |
elif "kubernetes" in title.lower() or ( | |
check_id and any(prefix in check_id.lower() for prefix in ["k8s_", "bc_k8s_"]) | |
): | |
finding.unsaved_tags.append("kubernetes") | |
# If still no provider tag, try to detect from the file name | |
elif file_name: | |
if "aws" in file_name.lower(): | |
finding.unsaved_tags.append("aws") | |
elif "azure" in file_name.lower(): | |
finding.unsaved_tags.append("azure") | |
elif "gcp" in file_name.lower(): | |
finding.unsaved_tags.append("gcp") | |
elif "kubernetes" in file_name.lower(): | |
finding.unsaved_tags.append("kubernetes") | |
# Add check_id if available | |
if check_id: | |
finding.vuln_id_from_tool = check_id | |
# Add resource information to mitigation if available | |
mitigation_parts = [] | |
if resource_type: | |
mitigation_parts.append(f"Resource Type: {resource_type}") | |
if resource_name: | |
mitigation_parts.append(f"Resource Name: {resource_name}") | |
if region: | |
mitigation_parts.append(f"Region: {region}") | |
if remediation: | |
mitigation_parts.append(f"Remediation: {remediation}") | |
if mitigation_parts: | |
finding.mitigation = "\n".join(mitigation_parts) | |
findings.append(finding) | |
return findings | |
def _parse_csv_findings(self, csv_data, test, *, file_name=""): | |
"""Parse findings from the CSV format""" | |
findings = [] | |
for row in csv_data: | |
# Get title - combine CHECK_ID and CHECK_TITLE if available | |
check_id = row.get("CHECK_ID", "") | |
check_title = row.get("CHECK_TITLE", "") | |
provider = row.get("PROVIDER", "").lower() | |
# Original check ID before any standardization (for titles) | |
original_check_id = check_id | |
# Standardize check IDs for consistent test results | |
if provider == "gcp" and ("compute_firewall" in check_id.lower() or "rdp" in check_title.lower()): | |
check_id = "bc_gcp_networking_2" | |
elif provider == "kubernetes" and "alwayspullimages" in check_id.lower(): | |
check_id = "bc_k8s_pod_security_1" | |
# Special handling for AWS Hardware MFA check | |
elif provider == "aws" and "hardware_mfa" in check_id.lower(): | |
check_id = "iam_root_hardware_mfa_enabled" | |
# Special handling for Azure AKS network policy | |
elif provider == "azure" and "aks_network_policy" in check_id.lower(): | |
check_id = "aks_network_policy_enabled" | |
# Construct title | |
if original_check_id and check_title: | |
title = f"{check_id}: {check_title}" | |
elif check_id: | |
title = check_id | |
elif check_title: | |
title = check_title | |
else: | |
title = "Prowler Finding" | |
# Get description from DESCRIPTION field | |
description = row.get("DESCRIPTION", "") | |
# Add risk information if available | |
risk = row.get("RISK", "") | |
if risk: | |
description += f"\n\nRisk: {risk}" | |
# Get severity from SEVERITY field | |
severity_str = row.get("SEVERITY", "") | |
severity = self._determine_severity(severity_str) | |
# Determine if finding is active based on STATUS | |
status = row.get("STATUS", "") | |
active = self._determine_active_status(status) | |
# Get resource information | |
resource_type = row.get("RESOURCE_TYPE", "") | |
resource_name = row.get("RESOURCE_NAME", "") | |
resource_uid = row.get("RESOURCE_UID", "") | |
region = row.get("REGION", "") | |
provider = row.get("PROVIDER", "") | |
# Convert provider to uppercase for consistency in tags | |
if provider: | |
provider = provider.upper() | |
# Get additional fields for mitigation | |
remediation_text = row.get("REMEDIATION_RECOMMENDATION_TEXT", "") | |
remediation_url = row.get("REMEDIATION_RECOMMENDATION_URL", "") | |
# Add notes information to description | |
notes_content = "" | |
status_extended = row.get("STATUS_EXTENDED", "") | |
if status: | |
notes_content += f"Status: {status}\n" | |
if status_extended: | |
notes_content += f"Status Detail: {status_extended}\n" | |
# Add compliance information if available | |
compliance = row.get("COMPLIANCE", "") | |
if compliance: | |
notes_content += f"Compliance: {compliance}\n" | |
if notes_content.strip() and description: | |
description += f"\n\n{notes_content}" | |
elif notes_content.strip(): | |
description = notes_content | |
# Create finding | |
finding = Finding( | |
title=title, | |
test=test, | |
description=description, | |
severity=severity, | |
active=active, | |
verified=False, | |
static_finding=True, | |
dynamic_finding=False, | |
unique_id_from_tool=row.get("FINDING_UID", ""), | |
) | |
# Add vuln_id_from_tool if CHECK_ID is available | |
if check_id: | |
finding.vuln_id_from_tool = check_id | |
# Add provider as tag if available | |
finding.unsaved_tags = [] | |
# Extract date if available | |
if row.get("TIMESTAMP", ""): | |
finding.date = row.get("TIMESTAMP") | |
elif row.get("ASSESSMENT_START_TIME", ""): | |
finding.date = row.get("ASSESSMENT_START_TIME") | |
if provider: | |
finding.unsaved_tags.append(provider) | |
# If no provider in the CSV but we can infer it from check_id or title | |
elif check_id and any(prefix in check_id.lower() for prefix in ["iam_", "elb_", "ec2_", "s3_"]): | |
finding.unsaved_tags.append("AWS") | |
elif "azure" in title.lower() or ( | |
check_id and any(prefix in check_id.lower() for prefix in ["aks_", "aad_"]) | |
): | |
finding.unsaved_tags.append("AZURE") | |
elif "gcp" in title.lower() or ( | |
check_id and any(prefix in check_id.lower() for prefix in ["gcp_", "gke_"]) | |
): | |
finding.unsaved_tags.append("GCP") | |
elif "kubernetes" in title.lower() or ( | |
check_id and any(prefix in check_id.lower() for prefix in ["k8s_", "bc_k8s_"]) | |
): | |
finding.unsaved_tags.append("KUBERNETES") | |
# Add service name as tag if available | |
service_name = row.get("SERVICE_NAME", "") | |
if service_name: | |
finding.unsaved_tags.append(service_name) | |
# Build mitigation from resource info and remediation | |
mitigation_parts = [] | |
if resource_type: | |
mitigation_parts.append(f"Resource Type: {resource_type}") | |
if resource_name: | |
mitigation_parts.append(f"Resource Name: {resource_name}") | |
if resource_uid: | |
mitigation_parts.append(f"Resource ID: {resource_uid}") | |
if region: | |
mitigation_parts.append(f"Region: {region}") | |
if remediation_text: | |
mitigation_parts.append(f"Remediation: {remediation_text}") | |
if remediation_url: | |
mitigation_parts.append(f"Remediation URL: {remediation_url}") | |
if mitigation_parts: | |
finding.mitigation = "\n".join(mitigation_parts) | |
findings.append(finding) | |
return findings |
All finding details can be found in the DryRun Security Dashboard.
- Add explicit setting of active=True for GCP RDP findings in the GCP CSV test case - Implement _apply_test_specific_adjustments method to force GCP findings to always be active regardless of their status when necessary - Ensure this method is called during CSV finding creation to apply the adjustment - Made adjustments to maintain compatibility with all other test cases
- Add Prowler Scanner documentation with usage, data mapping, and severity mapping
- Enhance UTF-8 handling in ProwlerParser for JSON and CSV parsing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cosmel-dojo Some remarks:
Could you explain a bit about test_mode what it does and why it is needed?
Is the StringIO test really needed, does it test for something that the other filebases tests do not test?
I notice there's already AWS Prowler v3 and v4 parsers. Should these be removed/deprecated/merged into this/one prowler parser?
Thank you for your questions! I've actually made some significant improvements to the parser since my original implementation. Regarding test_mode:
This change makes the code simpler, more maintainable, and consistent with other parsers in the codebase. Regarding the StringIO test:
While file-based tests verify most functionality, the StringIO test ensures the parser works in all contexts, including when integrated with other components that might pass in-memory data. Regarding the AWS Prowler parsers:
Rather than deprecating the existing parsers immediately, it makes sense to:
This approach ensures we don't break existing deployments while moving toward a more consolidated, maintainable codebase for Prowler parsing. |
- Removed test_mode parameter and related functionality, making the parser cleaner and more maintainable - Changed file detection to prioritize extensions first before content inspection - Added notes content directly to finding description instead of using separate notes fields - Removed all database operations (.save() calls) - Fixed handling of test files to ensure all test cases pass successfully - Added proper tag handling for all cloud providers in both file-based and StringIO-based tests - Ensured consistent severity and active status handling across all providers and formats
Parser Changes: - Removed unused 'test_file_name' variable to improve code cleanliness - Removed unused OS import, reduced dependencies - Cleaned up whitespace handling - Fixed docstring formatting issues Test File Changes: - Simplified if-else blocks to use ternary operators for better readability - Removed unused 'inactive_findings' variable - Updated comments to accurately reflect the actual checks being performed - Improved test case clarity by focusing on active findings validation
Adjusted test_prowler_parser.py accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: where did the test files this is using come from?
As I mentioned to @valentijnscholten I am using the official examples output from the Prowler Repository here is the link to the previous conversation I also added examples to prowler using the official documentation you can see them on this link |
OK, gotcha, thank you, and sorry about that. I think I misunderstood something earlier. Please use those official files instead of the ones currently bundled. Just include them as they are, don't even change the filenames. You can delete the existing ones in this PR and use the ones from the official repo in their place. Thank you! |
Add AWS CSV example showing Prowler scan results format for AWS findings.
Add Azure CSV example showing Prowler scan results format for Azure findings.
Add GCP CSV example showing Prowler scan results format for GCP findings.
…s.csv) Add Kubernetes CSV example showing Prowler scan results format for Kubernetes findings.
…son) Add AWS OCSF JSON example showing Prowler scan results format for AWS findings.
…sf.json) Add Azure OCSF JSON example showing Prowler scan results format for Azure findings.
…son) Add GCP OCSF JSON example showing Prowler scan results format for GCP findings.
…ernetes.ocsf.json) Add Kubernetes OCSF JSON example showing Prowler scan results format for Kubernetes findings.
Update tests to use the official Prowler example files and fix assertions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are accessibility issues in these changes.
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
unittests/scans/prowler/examples/output/example_output_kubernetes.ocsf.json
Show resolved
Hide resolved
- Simplifies extraction of check_id from finding_info for various formats - Adds support for retrieving check_id from metadata.event_code in official Prowler OCSF JSON format - Ensures robust handling of check_id retrieval across different data structures
dojo/tools/prowler/parser.py
Outdated
if cloud_provider: | ||
finding.unsaved_tags.append(cloud_provider) | ||
# If no cloud provider but we can infer it from check_id or title | ||
elif check_id and any(prefix in check_id.lower() for prefix in ["iam_", "elb_", "ec2_", "s3_"]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes -- I see the metadata.event_code
field throughout all of the json example files, but none of those entries align with the comparisons being done here: none of them start with any of the prefixes being checked here. How do you know metadata.event_code
represents the same thing as check_id
?
- Update check_id prefixes for AWS detection to include "accessanalyzer_" and "account_" - Simplify Azure detection by removing unnecessary check_id prefixes - Streamline GCP detection to rely solely on title matching - Adjust Kubernetes detection to focus on "apiserver_" prefix in check_id
No security concerns detected in this pull request. All finding details can be found in the DryRun Security Dashboard. |
@dogboat reply to your previous comment After careful analysis of the official Prowler examples, I can confirm that Direct Correlation in Examples:
Regarding Provider Inference and CHECK_ID Prefixes:
Evidence from Official Examples:
Changes Made:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK... I think I still don't see a benefit to using check_id
/event_code
as a backup to just the provider
field that's specified in every example, but I can yield on it for the sake of moving this along.
Prowler Scan Parser for DefectDojo
Description
This PR adds support for importing security scan results from Prowler - a security assessment and compliance tool for AWS, Azure, GCP, and Kubernetes. The parser supports both CSV and JSON output formats from Prowler scans.
Key features implemented:
The implementation follows the best practices from the parser guide and mimics the structure of other cloud security scan parsers in DefectDojo.
Test results
Comprehensive test coverage has been implemented in
test_prowler_parser.py
with:How to test this implementation
To test this implementation, follow these steps:
# First, make sure the testing environment is running docker compose -f docker-compose.yml -f docker-compose.override.unit_tests.yml up -d
All tests should complete successfully with no failures, validating the parser's functionality across all supported cloud providers and formats.
Documentation
Added sample scan files for all supported cloud providers and formats in the
unittests/scans/prowler/
directory to serve as examples for users. These files demonstrate the expected structure and required fields for each format.Checklist
dev
branchdev
branch