From 009d03b04453248835ea219c66f8f86b2dd982e6 Mon Sep 17 00:00:00 2001 From: NShani Date: Wed, 6 Oct 2021 11:34:25 +0530 Subject: [PATCH] Add Dependency Scan report Parser --- .../dojo_engagement_view_temp2.diff | 32 ++++ .../diff-files/dojo_utils_temp2.diff | 168 ++++++++++++++++++ .../diff-files/temp_dependency_parser.py | 99 +++++++++++ 3 files changed, 299 insertions(+) create mode 100644 external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_engagement_view_temp2.diff create mode 100644 external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_utils_temp2.diff create mode 100644 external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/temp_dependency_parser.py diff --git a/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_engagement_view_temp2.diff b/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_engagement_view_temp2.diff new file mode 100644 index 00000000..783cdf9e --- /dev/null +++ b/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_engagement_view_temp2.diff @@ -0,0 +1,32 @@ +diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py +index a86a49a2..41d966d9 100644 +--- a/dojo/engagement/views.py ++++ b/dojo/engagement/views.py +@@ -689,21 +689,14 @@ def import_scan_results(request, eid=None, pid=None): + + item.save(false_history=True, push_to_jira=push_to_jira) + +- if parser.note1 is not None: +- if str(parser.note1[i].entry) != ("nan"): +- item.notes.add(parser.note1[i]) ++ if parser.vul_influence_notes is not None: ++ if str(parser.vul_influence_notes[i].entry) != "nan": ++ item.notes.add(parser.vul_influence_notes[i]) + +- if parser.note2 is not None: +- if str(parser.note2[i].entry) != "nan": +- item.notes.add(parser.note2[i]) ++ if parser.wso2_resolution_notes is not None: ++ if str(parser.wso2_resolution_notes[i].entry) != "nan": ++ item.notes.add(parser.wso2_resolution_notes[i]) + +- if parser.note5 is not None: +- if str(parser.note5[i].entry) != "nan": +- item.notes.add(parser.note5[i]) +- +- if parser.note6 is not None: +- if str(parser.note6[i].entry) != "nan": +- item.notes.add(parser.note6[i]) + i += 1 + + if item.unsaved_tags is not None: diff --git a/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_utils_temp2.diff b/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_utils_temp2.diff new file mode 100644 index 00000000..0bbdba66 --- /dev/null +++ b/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/dojo_utils_temp2.diff @@ -0,0 +1,168 @@ +diff --git a/dojo/utils.py b/dojo/utils.py +index ef772b21..cd4cea8f 100644 +--- a/dojo/utils.py ++++ b/dojo/utils.py +@@ -34,6 +34,7 @@ from requests.auth import HTTPBasicAuth + from dojo.notifications.helper import create_notification + import logging + import itertools ++import re + + + logger = logging.getLogger(__name__) +@@ -112,15 +113,23 @@ def sync_dedupe(sender, *args, **kwargs): + if hasattr(settings, 'DEDUPLICATION_ATTRIBUTES'): + if new_finding.dynamic_finding == True: + attributes = settings.DEDUPLICATION_ATTRIBUTES['dynamic'] ++ deduplication_wso2_custom(new_finding, attributes) + elif new_finding.static_finding == True: + attributes = settings.DEDUPLICATION_ATTRIBUTES['static'] +- deduplication_wso2_custom(new_finding, attributes) ++ deduplication_wso2_custom(new_finding, attributes) ++ else: ++ deduplicate_legacy(new_finding) ++ elif deduplicationAlgorithm == settings.DEDUPE_ALGO_WSO2_SCA_CUSTOM: ++ if hasattr(settings, 'DEDUPLICATION_ATTRIBUTES'): ++ attributes = settings.DEDUPLICATION_ATTRIBUTES['sca'] ++ deduplication_wso2_sca_custom(new_finding, attributes) + else: + deduplicate_legacy(new_finding) + elif (deduplicationAlgorithm == settings.DEDUPE_ALGO_ATTRIBUTE_CONFIG) and hasattr(settings, + 'DEDUPLICATION_ATTRIBUTES') and hasattr( + settings, 'DEDUPLICATION_ALLOWED_ATTRIBUTES'): + configured_attributes = settings.DEDUPLICATION_ATTRIBUTES ++ + if (all(elem in settings.DEDUPLICATION_ALLOWED_ATTRIBUTES for elem in configured_attributes)): + deduplication_attr_config(new_finding, configured_attributes) + else: +@@ -540,11 +549,10 @@ def deduplication_wso2_custom(new_finding, attributes): + finding_filtered = finding_filtered.filter(**my_filter) + + similar_findings_product = finding_filtered.filter( +- test__engagement__product=new_finding.test.engagement.product) ++ test__engagement__product=new_finding.test.engagement.product).exclude(test=new_finding.test) + similar_findings_product = list(similar_findings_product) + original_finding = get_original_finding(new_finding, attributes, similar_findings_product) + +- print(original_finding.notes) + print("+++++++++++++++++++++++++++++++++++") + if original_finding is None: + product_name = new_finding.test.engagement.product.name +@@ -555,7 +563,7 @@ def deduplication_wso2_custom(new_finding, attributes): + print("+++++++++++++++++++++++++++++++++++ 2") + similar_findings_product_versions = finding_filtered.filter( + test__engagement__product__name__startswith=product_name).exclude( +- test__engagement__product=new_finding.test.engagement.product) ++ test__engagement__product=new_finding.test.engagement.product, test=new_finding.test) + print(similar_findings_product_versions) + print("+++++++++++++++++++++++++++++++++++ 3") + similar_findings_product_versions = list(similar_findings_product_versions) +@@ -584,58 +592,63 @@ def deduplication_wso2_custom(new_finding, attributes): + original_finding = None + + if original_finding is not None: +- print("+++++++++++++++++++++++++++++++++++ 4") +- print(new_finding.notes.all()) +- print(original_finding.notes.all()) +- # print(new_finding.notes[0]) +- # print(original_finding.notes[0]) + notes = original_finding.notes.all() + new_notes = new_finding.notes.all() +- print("_______________________________________") +- +- note1 = notes.filter(note_type__name="Use Case") +- print(note1) +- note2 = notes.filter(note_type__name="Vulnerability Influence") +- print(note2) +- note3 = new_notes.filter(note_type__name="Use Case") +- print(note3) +- note4 = new_notes.filter(note_type__name="Vulnerability Influence") +- print(note4) +- print("_______________________________________") +- print(note1.values('entry')) +- print(note3.values('entry')) +- print(note2.values('entry')) +- print(note4.values('entry')) +- +- +- if not note3: +- print("NOTE 3 IS NULL") +- else: +- print(str(note3.values('entry')) == str(note1.values('entry'))) +- if (str(note3.values('entry')) == str(note1.values('entry'))): +- print("_______________________________________ 1111") +- new_finding.notes.remove(note3[0]) +- if not note4: +- print("NOTE 4 IS NULL") ++ ++ new_finding.duplicate = True ++ new_finding.active = False ++ new_finding.verified = False ++ new_finding.duplicate_finding = original_finding ++ original_finding.duplicate_list.add(new_finding) ++ original_finding.found_by.add(new_finding.test.test_type) ++ super(Finding, new_finding).save() ++ ++ ++def deduplication_wso2_sca_custom(new_finding, attributes): ++ system_settings = System_Settings.objects.get() ++ if system_settings.enable_deduplication: ++ ++ if not new_finding.duplicate: ++ finding_filtered = Finding.objects.all().exclude(id=new_finding.id) ++ ++ for attr in attributes: ++ my_filter = {attr: getattr(new_finding, attr)} ++ finding_filtered = finding_filtered.filter(**my_filter) ++ ++ similar_findings_product = finding_filtered.filter( ++ test__engagement__product=new_finding.test.engagement.product).exclude(test=new_finding.test) ++ similar_findings_product = list(similar_findings_product) ++ ++ if similar_findings_product: ++ original_finding = sorted(similar_findings_product, key=lambda x: x.id, reverse=True)[0] ++ else: ++ original_finding = None ++ ++ if original_finding is None: ++ product_name = new_finding.test.engagement.product.name ++ product_name = re.split('( \d+)', product_name)[0] ++ ++ similar_findings_product_versions = finding_filtered.filter( ++ test__engagement__product__name__startswith=product_name).exclude( ++ test__engagement__product=new_finding.test.engagement.product, test=new_finding.test) ++ if similar_findings_product: ++ original_finding = sorted(similar_findings_product_versions, key=lambda x: x.id, reverse=True)[0] + else: +- print(str(note4.values('entry')) == str(note2.values('entry'))) +- if (str(note4.values('entry')) == str(note2.values('entry'))): +- print("_______________________________________ 2222") +- new_finding.notes.remove(note4[0]) +- # for note in notes: +- # print("________________________________________ 1") +- # print(note) +- # new_finding.notes.add(note) +- # if(note.note_type.name == "Use Case"): +- # Notes. +- # new_finding.notes.add() +- print("+++++++++++++++++++++++++++++++++++ 5") +- deduplicationLogger.debug('New finding ' + str(new_finding.id) + ' is a duplicate of existing finding ' + str(original_finding.id)) ++ original_finding = None ++ ++ if original_finding is None: ++ similar_findings_db = finding_filtered.exclude(test__engagement__product__name__startswith=product_name) ++ similar_findings_db = list(similar_findings_db) ++ if similar_findings_db: ++ original_finding = sorted(similar_findings_db, key=lambda x: x.id, reverse=True)[0] ++ ++ if original_finding is not None: ++ deduplicationLogger.debug('New finding ' + str(new_finding.id) + ' is a duplicate of existing finding ' ++ + str(original_finding.id)) + new_finding.duplicate = True + new_finding.active = False + new_finding.verified = False + new_finding.duplicate_finding = original_finding +- # original_finding.duplicate_list.add(new_finding) + original_finding.found_by.add(new_finding.test.test_type) + super(Finding, new_finding).save() + diff --git a/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/temp_dependency_parser.py b/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/temp_dependency_parser.py new file mode 100644 index 00000000..e8f255e7 --- /dev/null +++ b/external/VMS_Custom_Setup_Scripts/setup_scripts/diff-files/temp_dependency_parser.py @@ -0,0 +1,99 @@ +import csv +import hashlib +import os + +import pandas as pd +from tastypie import bundle + +from dojo.models import Finding, Notes, Note_Type, User + + +class TempParser(object): + # def __init__(self, filename, test): + # normalized_findings = self.normalize_findings(filename) + # self.ingest_findings(normalized_findings, test) + + def __init__(self, filename, test): + dupes = dict() + use_case_notes_dict = dict() + vul_influence_notes_dict = dict() + wso2_resolution_notes_dict = dict() + resolution_notes_dict = dict() + self.items = () + self.use_case_notes = () + self.vul_influence_notes = () + self.wso2_resolution_notes = () + self.resolution_notes = () + + if filename is None: + self.items = () + return + + df = pd.read_csv(filename, header=0) + + for i, row in df.iterrows(): + cwe = df.loc[i, 'cwe'] + title = df.loc[i, 'title'] + description = df.loc[i, 'description'] + sev = df.loc[i, 'severity'] + line = df.loc[i, 'line_number'] + issue_id = df.loc[i, 'issue_id'] + use_case_note = df.loc[i, 'Use_Case'] + vul_influence_note = df.loc[i, 'Vulnerability_Influence'] + resolution_note = df.loc[i, 'Resolution'] + sourcefilepath = df.loc[i, 'sourcefilepath'] + sourcefile = df.loc[i, 'sourcefile'] + mitigation = df.loc[i, 'mitigation'] + impact = df.loc[i, 'impact'] + WSO2_resolution = df.loc[i, 'WSO2_resolution'] + + dupe_key = sev + str(cwe) + str(line) + str(sourcefile) + str(sourcefilepath) + str(title) + str(issue_id) + + if dupe_key in dupes: + finding = dupes[dupe_key] + if finding.description: + finding.description = finding.description + "\nVulnerability ID: " + \ + df.loc[i, 'mitigation'] + # self.process_endpoints(finding, df, i) + dupes[dupe_key] = finding + else: + dupes[dupe_key] = True + + finding = Finding(title=title, + cwe=int(cwe), + test=test, + active=False, + verified=False, + severity=sev, + static_finding=True, + line_number=line, + file_path=sourcefilepath+sourcefile, + line=line, + sourcefile=sourcefile, + description=description, + numerical_severity=Finding. + get_numerical_severity(sev), + mitigation=mitigation, + impact=impact, + url='N/A') + + use_case_note = Notes(entry=use_case_note, note_type=Note_Type(id=2), author=User.objects.all().first()) + vul_influence_note = Notes(entry=vul_influence_note, note_type=Note_Type(id=3), author=User.objects.all().first()) + wso2_resolution_note = Notes(entry=WSO2_resolution, note_type=Note_Type(id=1), author=User.objects.all().first()) + resolution_note = Notes(entry=resolution_note, note_type=Note_Type(id=4), author=User.objects.all().first()) + use_case_note.save() + vul_influence_note.save() + wso2_resolution_note.save() + resolution_note.save() + + dupes[dupe_key] = finding + use_case_notes_dict[dupe_key] = use_case_note + vul_influence_notes_dict[dupe_key] = vul_influence_note + wso2_resolution_notes_dict[dupe_key] = wso2_resolution_note + resolution_notes_dict[dupe_key] = resolution_note + + self.items = list(dupes.values()) + self.use_case_notes = list(use_case_notes_dict.values()) + self.vul_influence_notes = list(vul_influence_notes_dict.values()) + self.wso2_resolution_notes = list(wso2_resolution_notes_dict.values()) + self.resolution_notes = list(resolution_notes_dict.values())