Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class CVEDB:
PRIMARY KEY(metrics_id)
)
""",
"deduplication": """
CREATE TABLE IF NOT EXISTS deduplication (
purl TEXT,
vendor TEXT,
PRIMARY KEY (purl, vendor)
)
""",
}

EMPTY_SELECT_QUERIES = {
Expand Down Expand Up @@ -392,20 +399,25 @@ def init_database(self) -> None:
exploit_table_create,
cve_metrics_table_create,
metrics_table_create,
deduplication,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
self.TABLE_SCHEMAS["deduplication"],
)
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
index_purl = "CREATE INDEX IF NOT EXISTS purl_index ON deduplication (purl)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
cursor.execute(exploit_table_create)
cursor.execute(cve_metrics_table_create)
cursor.execute(metrics_table_create)
cursor.execute(deduplication)
cursor.execute(index_range)
cursor.execute(index_purl)

(
severity_schema,
Expand Down
72 changes: 60 additions & 12 deletions cve_bin_tool/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, cve_db, logger):
self.logger = logger
self.filename = ""
self.purl_pkg_type = "default"
self.connections = {}

def run_checker(self, filename):
"""
Expand Down Expand Up @@ -117,7 +118,7 @@ def find_vendor_from_purl(self, purl, ver) -> tuple[list[ScanInfo], bool]:
UNION
SELECT cpe from purl2cpe WHERE purl LIKE ?
"""
cursor = self.db_open_and_get_cursor()
cursor = self.db_open_and_get_cursor("purl2cpe/purl2cpe.db")
cursor.execute(query, (param1, param2))
cpeList = cursor.fetchall()
vendorlist: list[ScanInfo] = []
Expand Down Expand Up @@ -147,22 +148,69 @@ def find_vendor_from_purl(self, purl, ver) -> tuple[list[ScanInfo], bool]:

return vendorlist, True
except Exception as e:
self.logger.error(f"Error occurred: {e}")
self.logger.debug(f"Error occurred: {e}")
self.logger.error("Unable to access purl2cpe database.")
return [], False

def db_open_and_get_cursor(self) -> sqlite3.Cursor:
"""Opens connection to sqlite database, returns cursor object."""
def deduplication(self, purl, vendorlist) -> list[ScanInfo]:
"""
Modifies invalid vendors associated with a given PURL using the deduplication database.

dbpath = (
Path("~").expanduser() / ".cache" / "cve-bin-tool" / "purl2cpe/purl2cpe.db"
)
connection = sqlite3.connect(dbpath)
It queries the database for vendors associated with the PURL and filters the input 'vendorlist'
accordingly:

- If a vendor from 'vendorlist' is found in the database (valid vendor), it is added directly
to 'vendorlist_filtered'.
- If a vendor from 'vendorlist' is not found in the database (invalid vendor), a new ScanInfo
object is created with the vendor marked as 'UNKNOWN' and added to 'vendorlist_filtered'.

"""
try:
purl = purl.to_dict()
param = f"pkg:{purl['type']}/{purl['name']}"
query = """
SELECT vendor FROM deduplication WHERE purl LIKE ?
"""
vendorlist_filtered: list[ScanInfo] = []
cursor = self.db_open_and_get_cursor("cve.db")
cursor.execute(query, (param,))

invalidVendorList = [i[0] for i in cursor.fetchall()]

for item in vendorlist:
if item.product_info.vendor not in invalidVendorList:
vendorlist_filtered.append(item)

if len(vendorlist_filtered) == 0:
vendorlist_filtered.append(
ScanInfo(
ProductInfo(
"UNKNOWN",
item.product_info.product,
item.product_info.version,
item.file_path,
item.product_info.purl,
),
item.file_path,
)
)
return vendorlist_filtered
except Exception as e:
self.logger.debug(f"error: {e}")
self.logger.error("Unable to access deduplication database.")
return vendorlist

def db_open_and_get_cursor(self, dbname) -> sqlite3.Cursor:
"""Opens connection to sqlite database, returns cursor object."""

if connection is not None:
cursor = connection.cursor()
if cursor is None:
dbpath = Path("~").expanduser() / ".cache" / "cve-bin-tool" / dbname
if dbname not in self.connections:
self.connections[dbname] = sqlite3.connect(dbpath)
connection = self.connections[dbname]
if connection.cursor() is None:
self.logger.error("Database cursor does not exist")
raise CVEDBError
return cursor
return connection.cursor()

def decode_cpe23(self, cpe23) -> tuple[str, str, str]:
"""
Expand Down
19 changes: 4 additions & 15 deletions cve_bin_tool/parsers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from cve_bin_tool.parsers import Parser
from cve_bin_tool.strings import parse_strings
from cve_bin_tool.util import ProductInfo, ScanInfo


class PythonRequirementsParser(Parser):
Expand Down Expand Up @@ -106,6 +105,7 @@ def run_checker(self, filename):
if not result:
vendor = self.find_vendor(product, version)

vendor = self.deduplication(purl, vendor)
if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
Expand Down Expand Up @@ -159,23 +159,12 @@ def run_checker(self, filename):
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)

if vendor is not None:
yield from vendor

if not result:
vendor_package_pair = self.cve_db.get_vendor_product_pairs(product)
if vendor_package_pair != []:
for pair in vendor_package_pair:
vendor = pair["vendor"]
location = pair.get("location", self.filename)
file_path = self.filename
self.logger.debug(
f"{file_path} is {vendor}.{product} {version}"
)
yield ScanInfo(
ProductInfo(vendor, product, version, location), file_path
)

# There are packages with a METADATA file in them containing different data from what the tool expects
except AttributeError:
self.logger.debug(f"{filename} is an invalid METADATA/PKG-INFO")
Expand Down
3 changes: 0 additions & 3 deletions test/language_data/FAIL-PKG-INFO

This file was deleted.

1 change: 0 additions & 1 deletion test/test_language_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def test_javascript_package_none_found(self, filename: str) -> None:
@pytest.mark.parametrize(
"filename",
[
(str(TEST_FILE_PATH / "FAIL-PKG-INFO")),
(str(TEST_FILE_PATH / "fail_pom.xml")),
],
)
Expand Down