-
Notifications
You must be signed in to change notification settings - Fork 1.7k
ReversingLabs SpectraAssure rl-json parser for DefectDojo #12579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
This pull request contains a potential security vulnerability in logging practices, specifically in the ReversingLabs SpectraAssure tool, where debug logging may expose sensitive internal system details that could be exploited if logging configurations are not properly secured in production environments.
Logging Information Exposure in
|
Vulnerability | Logging Information Exposure |
---|---|
Description | Extensive use of debug logging with detailed internal information could potentially expose system details if logging is misconfigured in production. Sensitive information like file paths, UUIDs, and processing details are logged without sufficient safeguards. |
django-DefectDojo/dojo/tools/reversinglabs_spectraassure/rlJsonInfo/__init__.py
Lines 1 to 594 in 5311b14
import copy | |
import datetime | |
import gc | |
import json | |
import logging | |
import os | |
import sys | |
from typing import Any | |
from .cve_info_node import CveInfoNode | |
logger = logging.getLogger(__name__) | |
""" | |
The info of the rl.json reports but cut up in usable parts | |
""" | |
class RlJsonInfo: | |
SCAN_TOOL_NAME: str = "ReversingLabs SpectraAssure" | |
info: dict[str, Any] | |
assessments: dict[str, Any] | |
components: dict[str, Any] | |
cryptography: dict[str, Any] | |
dependencies: dict[str, Any] | |
indicators: dict[str, Any] | |
licenses: dict[str, Any] | |
ml_models: dict[str, Any] | |
services: dict[str, Any] | |
secrets: dict[str, Any] | |
violations: dict[str, Any] | |
vulnerabilities: dict[str, Any] | |
_rest: dict[str, Any] | |
_metadata: list[str] = [ | |
"assessments", | |
"components", | |
"cryptography", | |
"dependencies", | |
"indicators", | |
"licenses", | |
"ml_models", | |
"services", | |
"secrets", | |
"violations", | |
"vulnerabilities", | |
] | |
sMap: dict[int, str] = { | |
1: "Info", | |
2: "Low", | |
3: "Medium", | |
4: "High", | |
5: "Critical", | |
} | |
tags: dict[str, str] = { | |
"FIXABLE": "Fix Available", | |
"EXISTS": "Exploit Exists", | |
"MALWARE": "Exploited by Malware", | |
"MANDATE": "Patching Mandated", | |
"UNPROVEN": "CVE Discovered", | |
} | |
# sort order, to align wit Spectra Assure Portal | |
# 1: Fix Available | |
# 2: Exploit exists | |
# 3: Exploited my malware | |
# 4: Patch mandated | |
impact_sort_order: list[str] = [ | |
"Fix Available", | |
"Exploit Exists", | |
"Exploited by Malware", | |
"Patching Mandated", | |
"CVE Discovered", | |
] | |
# cve, comp_uuid, dep_uuid | None -> CveInfoNode | |
_results: dict[str, dict[str, dict[str | None, CveInfoNode]]] | |
def __init__( | |
self, | |
file_handle: Any, | |
) -> None: | |
self.file_name: str = file_handle.name | |
logger.debug("file: %s", self.file_name) | |
self.data: dict[str, Any] = json.load(file_handle) | |
self._results = {} | |
self.RL_JSON_WITH_CG_COLLECT: bool = False | |
if os.getenv("RL_JSON_WITH_CG_COLLECT"): | |
self.RL_JSON_WITH_CG_COLLECT = True | |
self._get_info() | |
self._get_meta() | |
self._get_rest() | |
if self.RL_JSON_WITH_CG_COLLECT is True: | |
gc.collect() | |
def _get_info( | |
self, | |
) -> None: | |
logger.debug("") | |
r = self.data.get("report", {}) | |
k = "info" | |
if k in r: | |
self.info = copy.deepcopy(r.get(k, {})) | |
del r[k] | |
def _get_meta( | |
self, | |
) -> None: | |
logger.debug("") | |
r = self.data.get("report", {}) | |
m = r.get("metadata", {}) | |
for name in self._metadata: | |
if name in m: | |
setattr( | |
self, | |
name, | |
copy.deepcopy(m.get(name, {})), | |
) | |
del m[name] | |
if len(m) == 0: | |
del r["metadata"] | |
if len(r) == 0: | |
del self.data["report"] | |
def _get_rest( | |
self, | |
) -> None: | |
logger.debug("") | |
self._rest = copy.deepcopy(self.data) | |
self.data = {} | |
def _find_sha256_in_components( | |
self, | |
sha256: str, | |
) -> bool: | |
logger.debug("") | |
for component in self.components.values(): | |
comp_sha256 = self._get_sha256(data=component) | |
if comp_sha256 == sha256: | |
return True | |
return False | |
def _add_to_results( | |
self, | |
cve: str, | |
comp_uuid: str, | |
dep_uuid: str | None, | |
cin: CveInfoNode | None, | |
) -> None: | |
logger.debug("") | |
if cin is None: | |
return | |
rr = self._results | |
if cve not in rr: | |
rr[cve] = {} | |
if comp_uuid not in rr[cve]: | |
rr[cve][comp_uuid] = {} | |
if dep_uuid not in rr[cve][comp_uuid]: | |
rr[cve][comp_uuid][dep_uuid] = cin | |
def _get_sha256( | |
self, | |
data: dict[str, Any], | |
) -> str: | |
logger.debug("") | |
k = "sha256" | |
h = data.get("hashes", []) | |
for item in h: | |
if item[0] == k: | |
return str(item[1]) | |
logger.error("no '%s' found for this item %s", k, data) | |
return "" | |
def _verify_file_is_also_component( | |
self, | |
) -> bool: | |
logger.debug("") | |
rr: bool = False | |
f_info: dict[str, Any] = self.info.get("file", {}) | |
file_sha256 = self._get_sha256(f_info) | |
rr = self._find_sha256_in_components(file_sha256) | |
if rr is False: | |
logger.error("file cannot be found as component: %s", f_info) | |
return rr | |
def _score_to_severity( | |
self, | |
score: float, | |
) -> str: | |
logger.debug("") | |
if score >= 9: | |
return self.sMap[5] | |
if score >= 7: | |
return self.sMap[4] | |
if score >= 4: | |
return self.sMap[3] | |
if score > 0: | |
return self.sMap[2] | |
return self.sMap[1] | |
def _use_path_or_name( | |
self, | |
*, | |
data: dict[str, Any], | |
purl: str, | |
name_first: bool = False, | |
prefer_path: bool = True, | |
) -> str: | |
logger.debug("") | |
path = data.get("path", "") | |
name = data.get("name", "") | |
if name_first and len(name) > 0: | |
return str(name) | |
if prefer_path and len(path) > 0: | |
return str(path) | |
# if we have a valid purl | |
# prefer to derive the name from the purl | |
if purl and len(purl) > 0 and "@" in purl: | |
s = purl | |
if "/" in s: | |
ii = purl.index("/") | |
s = purl[ii + 1 :] | |
aa = s.split("@") | |
name = aa[0] | |
# version = aa[1] | |
return str(name) | |
k = "" | |
if name_first is False: | |
if path != "": | |
return str(path) | |
if name != "": | |
return str(name) | |
return k | |
if name != "": | |
return str(name) | |
if path != "": | |
return str(path) | |
return k | |
def _get_tags_from_cve(self, this_cve: dict[str, Any]) -> list[str]: | |
tags: list[str] = [] | |
exploit = this_cve.get("exploit", []) | |
if len(exploit) == 0: | |
return tags | |
for key in exploit: | |
tag = self.tags.get(key) | |
if tag is None: | |
logger.warning("missing tag for key: %s", key) | |
continue | |
tags.append(tag) | |
return tags | |
def _make_impact_from_tags( | |
self, | |
tags: list[str], | |
impact: str | None, | |
) -> str: | |
if impact is None: | |
impact = "" | |
for tag in self.impact_sort_order: | |
if tag in tags: | |
impact += tag + "\n" | |
return impact | |
def _make_new_cin( | |
self, | |
cve: str, | |
comp_uuid: str, | |
dep_uuid: str | None, | |
active: Any, | |
) -> CveInfoNode | None: | |
"""Collect all info we can extract from the cve""" | |
logger.debug("") | |
this_cve = self.vulnerabilities.get(cve) | |
if this_cve is None: | |
logger.error("missing cve info for: %s", cve) | |
return None | |
cin = CveInfoNode() | |
cin.cve = cve | |
cin.comp_uuid = comp_uuid | |
cin.dep_uuid = dep_uuid | |
cin.active = bool(active) | |
f_info: dict[str, Any] = self.info.get("file", {}) | |
cin.original_file = str(f_info.get("name", "")) | |
cin.original_file_sha256 = self._get_sha256(f_info) | |
cin.scan_date = datetime.datetime.fromisoformat(self._rest["timestamp"]).date() | |
cin.scan_tool = self.SCAN_TOOL_NAME | |
cin.scan_tool_version = self._rest.get("version", "no_scan_tool_version_specified") | |
cin.cvss_version = int(this_cve.get("cvss", {}).get("version", "0")) | |
score = float(this_cve.get("cvss", {}).get("baseScore", "0.0")) | |
cin.score = score | |
cin.score_severity = self._score_to_severity(score=score) | |
# TODO: tags | |
cin.tags = self._get_tags_from_cve(this_cve) | |
cin.impact = self._make_impact_from_tags(cin.tags, cin.impact) | |
return cin | |
def _get_component_purl( | |
self, | |
component: dict[str, Any], | |
) -> str: | |
return str(component.get("identity", {}).get("purl", "")) | |
def _get_dependency_purl( | |
self, | |
dependency: dict[str, Any], | |
) -> str: | |
return str(dependency.get("purl", "")) | |
def _do_one_cve_component_dependency( | |
self, | |
comp_uuid: str, | |
component: dict[str, Any], | |
dep_uuid: str, | |
dependency: dict[str, Any], | |
cve: str, | |
active: Any, | |
) -> CveInfoNode | None: | |
logger.debug("comp: %s; dep: %s; cve: %s", comp_uuid, dep_uuid, cve) | |
cin = self._make_new_cin( | |
cve=cve, | |
active=active, | |
comp_uuid=comp_uuid, | |
dep_uuid=dep_uuid, | |
) | |
if cin is None: | |
return None | |
ident = component.get("identity", {}) | |
c_purl = self._get_component_purl(component=component) | |
cin.component_file_path = self._use_path_or_name(data=component, purl=c_purl) | |
cin.component_file_sha256 = self._get_sha256(data=component) | |
cin.component_file_purl = c_purl | |
cin.component_file_version = ident.get("version", "") | |
cin.component_file_name = component.get("name", "") | |
cin.component_type = "dependency" | |
cin.component_name = dependency.get("product", f"no_{cin.component_type}_product_provided") | |
cin.component_version = dependency.get("version", f"no_{cin.component_type}_version_provided") | |
d_purl = self._get_dependency_purl(dependency=dependency) | |
cin.component_purl = d_purl | |
cin.make_title_cin(cve=cve) | |
cin.make_description_cin(cve=cve, purl=d_purl) | |
cin.vuln_id_from_tool = cve | |
# should be constant over multiple re-scans (repeatable) | |
# see: https://github.com/DefectDojo/django-DefectDojo/pull/12463 | |
# TODO: use component sha256 + dep.name, dep version + cve | |
dep_purl = dependency.get("purl", "") | |
dep_name = dependency.get("product", "") | |
dep_version = dependency.get("version", "") | |
# if we have a dependency purl then purl, otherwise component product + version | |
tail = dep_purl | |
if len(tail) == 0: | |
tail = f"{dep_name}@{dep_version}" | |
cin.unique_id_from_tool = f"{cin.component_file_sha256}:{cve}:{tail}" | |
logger.debug("%s", cin) | |
return cin | |
def _do_one_cve_component_without_dependencies( | |
self, | |
comp_uuid: str, | |
component: dict[str, Any], | |
cve: str, | |
active: Any, | |
) -> CveInfoNode | None: | |
logger.debug("comp: %s; cve: %s", comp_uuid, cve) | |
cin = self._make_new_cin( | |
cve=cve, | |
active=active, | |
comp_uuid=comp_uuid, | |
dep_uuid=None, | |
) | |
if cin is None: | |
return None | |
ident = component.get("identity", {}) | |
c_purl = self._get_component_purl(component=component) | |
cin.component_file_path = self._use_path_or_name(data=component, purl=c_purl) | |
cin.component_file_sha256 = self._get_sha256(data=component) | |
cin.component_file_purl = c_purl | |
cin.component_file_version = ident.get("version", "") | |
cin.component_file_name = component.get("name", "") | |
cin.component_type = "component" | |
cin.component_name = self._use_path_or_name(data=component, purl=c_purl, name_first=True) | |
cin.component_version = ident.get("version", "") | |
cin.component_purl = c_purl | |
cin.make_title_cin(cve=cve) | |
cin.make_description_cin(cve=cve, purl=c_purl) | |
cin.vuln_id_from_tool = cve | |
# should be constant over multiple re-scans (repeatable) | |
# see: https://github.com/DefectDojo/django-DefectDojo/pull/12463 | |
cin.unique_id_from_tool = f"{cin.component_file_sha256}:{cve}" | |
logger.debug("%s", cin) | |
return cin | |
def _get_one_active_cve_component_dependency( | |
self, | |
comp_uuid: str, | |
component: dict[str, Any], | |
dep_uuid: str, | |
) -> None: | |
logger.debug("") | |
dependency = self.dependencies.get(dep_uuid) | |
if dependency is None: | |
logger.error("missing dependency: %s", dep_uuid) | |
return | |
# ------------------------------- | |
v = dependency.get("vulnerabilities") | |
if v is None: | |
logger.info("no vulnerabilities for dependency: %s", dep_uuid) | |
return | |
# ------------------------------- | |
for cve in v.get("active"): | |
cin = self._do_one_cve_component_dependency( | |
comp_uuid=comp_uuid, | |
component=component, | |
dep_uuid=dep_uuid, | |
dependency=dependency, | |
cve=cve, | |
active=True, | |
) | |
self._add_to_results( | |
cve=cve, | |
comp_uuid=comp_uuid, | |
dep_uuid=dep_uuid, | |
cin=cin, | |
) | |
def _get_all_active_cve_on_components_without_dependencies( | |
self, | |
) -> None: | |
logger.debug("") | |
for comp_uuid, component in self.components.items(): | |
v = component.get("identity", {}).get("vulnerabilities", None) | |
if v is None: | |
logger.info("no vulnerabilities for component: %s", comp_uuid) | |
continue | |
for cve in v.get("active", []): | |
cin = self._do_one_cve_component_without_dependencies( | |
comp_uuid=comp_uuid, | |
component=component, | |
cve=cve, | |
active=True, | |
) | |
self._add_to_results( | |
cve=cve, | |
comp_uuid=comp_uuid, | |
dep_uuid=None, | |
cin=cin, | |
) | |
def _get_all_active_cve_on_components_with_dependencies( | |
self, | |
) -> None: | |
logger.debug("") | |
for comp_uuid, component in self.components.items(): | |
d = component.get("identity", {}).get("dependencies", None) | |
if d is None: | |
logger.info("no dependencies for component: %s", comp_uuid) | |
continue | |
for dep_uuid in d: | |
# returns one dep_uuid, multiple cve (if any cve) | |
self._get_one_active_cve_component_dependency( | |
comp_uuid=comp_uuid, | |
component=component, | |
dep_uuid=dep_uuid, | |
) | |
# ==== PUBLIC ====== | |
def get_results_list(self) -> list[CveInfoNode]: | |
rr: list[CveInfoNode] = [] | |
for compo in self._results.values(): | |
for component in compo.values(): | |
for cin in component.values(): | |
rr.append(cin) | |
return rr | |
def print_results_to_file_or_stdout( | |
self, | |
file_handle: Any = sys.stdout, | |
) -> None: | |
def default(o: Any) -> Any: | |
if type(o) is CveInfoNode: | |
return o.__dict__ | |
if type(o) is datetime.date: | |
return o.isoformat() # YYYY-MM-DD | |
if type(o) is datetime.datetime: | |
return o.isoformat() # YYYY-MM-DD T hh:mm:ss <tz info> | |
msg: str = f"unsupported type: {type(o)}" | |
raise Exception(msg) | |
rr: list[Any] = self.get_results_list() | |
print( | |
json.dumps( | |
rr, | |
indent=4, | |
sort_keys=True, | |
default=default, | |
), | |
file=file_handle, | |
) | |
def get_cve_active_all(self) -> None: | |
""" | |
Find get all cve's and add componenet path, sha and version like in `report.cve.csv` | |
0: | |
verify that the info -> file sha256 comes back as a component | |
A: | |
walk over components with active vulnerabilities | |
B: | |
walk over components -> dependencies with active vulnerabilities | |
""" | |
logger.debug("") | |
self.file_is_component = self._verify_file_is_also_component() | |
self._get_all_active_cve_on_components_without_dependencies() | |
self._get_all_active_cve_on_components_with_dependencies() |
All finding details can be found in the DryRun Security Dashboard.
after bot complain added 'title: ...' to the doc file |
fixed the readme after the checker complained |
my local hugo extended test now shows no errors |
|
||
logger = logging.getLogger(__name__) | ||
|
||
WHAT = "ReversingLabs Spectra Assure" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give this a clearer name?
cin.score = score | ||
cin.score_severity = self._score_to_severity(score=score) | ||
|
||
# TODO: tags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still a TODO?
if len(tail) == 0: | ||
tail = f"{dep_name}@{dep_version}" | ||
|
||
cin.unique_id_from_tool = f"{cin.component_file_sha256}:{cve}:{tail}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the unique_id_from_tool
value should be something that is in the report, not a computed/constructed value. Can you look at this?
Feature: ReversingLabs SpectraAssure rl-json parser for DefectDojo
Documentation and unittests provded according to the plugin documentation
This adds a new parser for the rl-json format produced by the SpectraAssure cli or portal.