Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cve_bin_tool/data_sources/curl_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@


class Curl_Source(Data_Source):
"""Represents a data source for retrieving information about vulnerabilities in cURL."""
SOURCE = "Curl"
CACHEDIR = DISK_LOCATION_DEFAULT
BACKUPCACHEDIR = DISK_LOCATION_BACKUP
LOGGER = LOGGER.getChild("CVEDB")
DATA_SOURCE_LINK = "https://curl.se/docs/vuln.json"

def __init__(self, error_mode=ErrorMode.TruncTrace):
"""Initialize a Curl_Source instance. Args: error_mode (ErrorMode): The error mode to be used."""
self.cve_list = None
self.cachedir = self.CACHEDIR
self.backup_cachedir = self.BACKUPCACHEDIR
Expand All @@ -40,12 +42,14 @@ def __init__(self, error_mode=ErrorMode.TruncTrace):
self.vulnerability_data = []

async def get_cve_data(self):
"""Get cURL vulnerability data. Fetches the cURL vulnerability data and retrieves a list of affected data. Returns: Tuple[Union[None, Any], str]: A tuple containing the affected data and the source name."""
await self.fetch_cves()
self.get_cve_list()

return (None, self.affected_data), self.source_name

async def fetch_cves(self):
"""Fetch cURL vulnerabilities data."""
if not self.session:
connector = aiohttp.TCPConnector(limit_per_host=19)
self.session = RateLimiter(
Expand All @@ -57,6 +61,7 @@ async def fetch_cves(self):
await self.session.close()

async def download_curl_vulnerabilities(self, session: RateLimiter) -> None:
"""Download cURL vulnerability data and save it to a file. Args: session (RateLimiter): The session to use for the HTTP request."""
async with await session.get(self.DATA_SOURCE_LINK) as response:
response.raise_for_status()
self.vulnerability_data = await response.json()
Expand All @@ -66,6 +71,7 @@ async def download_curl_vulnerabilities(self, session: RateLimiter) -> None:
await f.write(json.dumps(self.vulnerability_data, indent=4))

def get_cve_list(self):
"""Get a list of affected cURL vulnerabilities."""
self.affected_data = []

for cve in self.vulnerability_data:
Expand Down