@@ -47,6 +47,7 @@ def __init__(self, cve_db, logger):
47
47
self .logger = logger
48
48
self .filename = ""
49
49
self .purl_pkg_type = "default"
50
+ self .connections = {}
50
51
51
52
def run_checker (self , filename ):
52
53
"""
@@ -117,7 +118,7 @@ def find_vendor_from_purl(self, purl, ver) -> tuple[list[ScanInfo], bool]:
117
118
UNION
118
119
SELECT cpe from purl2cpe WHERE purl LIKE ?
119
120
"""
120
- cursor = self .db_open_and_get_cursor ()
121
+ cursor = self .db_open_and_get_cursor ("purl2cpe/purl2cpe.db" )
121
122
cursor .execute (query , (param1 , param2 ))
122
123
cpeList = cursor .fetchall ()
123
124
vendorlist : list [ScanInfo ] = []
@@ -147,22 +148,69 @@ def find_vendor_from_purl(self, purl, ver) -> tuple[list[ScanInfo], bool]:
147
148
148
149
return vendorlist , True
149
150
except Exception as e :
150
- self .logger .error (f"Error occurred: { e } " )
151
+ self .logger .debug (f"Error occurred: { e } " )
152
+ self .logger .error ("Unable to access purl2cpe database." )
151
153
return [], False
152
154
153
- def db_open_and_get_cursor (self ) -> sqlite3 .Cursor :
154
- """Opens connection to sqlite database, returns cursor object."""
155
+ def deduplication (self , purl , vendorlist ) -> list [ScanInfo ]:
156
+ """
157
+ Modifies invalid vendors associated with a given PURL using the deduplication database.
155
158
156
- dbpath = (
157
- Path ("~" ).expanduser () / ".cache" / "cve-bin-tool" / "purl2cpe/purl2cpe.db"
158
- )
159
- connection = sqlite3 .connect (dbpath )
159
+ It queries the database for vendors associated with the PURL and filters the input 'vendorlist'
160
+ accordingly:
161
+
162
+ - If a vendor from 'vendorlist' is found in the database (valid vendor), it is added directly
163
+ to 'vendorlist_filtered'.
164
+ - If a vendor from 'vendorlist' is not found in the database (invalid vendor), a new ScanInfo
165
+ object is created with the vendor marked as 'UNKNOWN' and added to 'vendorlist_filtered'.
166
+
167
+ """
168
+ try :
169
+ purl = purl .to_dict ()
170
+ param = f"pkg:{ purl ['type' ]} /{ purl ['name' ]} "
171
+ query = """
172
+ SELECT vendor FROM deduplication WHERE purl LIKE ?
173
+ """
174
+ vendorlist_filtered : list [ScanInfo ] = []
175
+ cursor = self .db_open_and_get_cursor ("cve.db" )
176
+ cursor .execute (query , (param ,))
177
+
178
+ invalidVendorList = [i [0 ] for i in cursor .fetchall ()]
179
+
180
+ for item in vendorlist :
181
+ if item .product_info .vendor not in invalidVendorList :
182
+ vendorlist_filtered .append (item )
183
+
184
+ if len (vendorlist_filtered ) == 0 :
185
+ vendorlist_filtered .append (
186
+ ScanInfo (
187
+ ProductInfo (
188
+ "UNKNOWN" ,
189
+ item .product_info .product ,
190
+ item .product_info .version ,
191
+ item .file_path ,
192
+ item .product_info .purl ,
193
+ ),
194
+ item .file_path ,
195
+ )
196
+ )
197
+ return vendorlist_filtered
198
+ except Exception as e :
199
+ self .logger .debug (f"error: { e } " )
200
+ self .logger .error ("Unable to access deduplication database." )
201
+ return vendorlist
202
+
203
+ def db_open_and_get_cursor (self , dbname ) -> sqlite3 .Cursor :
204
+ """Opens connection to sqlite database, returns cursor object."""
160
205
161
- if connection is not None :
162
- cursor = connection .cursor ()
163
- if cursor is None :
206
+ dbpath = Path ("~" ).expanduser () / ".cache" / "cve-bin-tool" / dbname
207
+ if dbname not in self .connections :
208
+ self .connections [dbname ] = sqlite3 .connect (dbpath )
209
+ connection = self .connections [dbname ]
210
+ if connection .cursor () is None :
211
+ self .logger .error ("Database cursor does not exist" )
164
212
raise CVEDBError
165
- return cursor
213
+ return connection . cursor ()
166
214
167
215
def decode_cpe23 (self , cpe23 ) -> tuple [str , str , str ]:
168
216
"""
0 commit comments