Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py
index a86a49a2..41d966d9 100644
--- a/dojo/engagement/views.py
+++ b/dojo/engagement/views.py
@@ -689,21 +689,14 @@ def import_scan_results(request, eid=None, pid=None):

item.save(false_history=True, push_to_jira=push_to_jira)

- if parser.note1 is not None:
- if str(parser.note1[i].entry) != ("nan"):
- item.notes.add(parser.note1[i])
+ if parser.vul_influence_notes is not None:
+ if str(parser.vul_influence_notes[i].entry) != "nan":
+ item.notes.add(parser.vul_influence_notes[i])

- if parser.note2 is not None:
- if str(parser.note2[i].entry) != "nan":
- item.notes.add(parser.note2[i])
+ if parser.wso2_resolution_notes is not None:
+ if str(parser.wso2_resolution_notes[i].entry) != "nan":
+ item.notes.add(parser.wso2_resolution_notes[i])

- if parser.note5 is not None:
- if str(parser.note5[i].entry) != "nan":
- item.notes.add(parser.note5[i])
-
- if parser.note6 is not None:
- if str(parser.note6[i].entry) != "nan":
- item.notes.add(parser.note6[i])
i += 1

if item.unsaved_tags is not None:
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
diff --git a/dojo/utils.py b/dojo/utils.py
index ef772b21..cd4cea8f 100644
--- a/dojo/utils.py
+++ b/dojo/utils.py
@@ -34,6 +34,7 @@ from requests.auth import HTTPBasicAuth
from dojo.notifications.helper import create_notification
import logging
import itertools
+import re


logger = logging.getLogger(__name__)
@@ -112,15 +113,23 @@ def sync_dedupe(sender, *args, **kwargs):
if hasattr(settings, 'DEDUPLICATION_ATTRIBUTES'):
if new_finding.dynamic_finding == True:
attributes = settings.DEDUPLICATION_ATTRIBUTES['dynamic']
+ deduplication_wso2_custom(new_finding, attributes)
elif new_finding.static_finding == True:
attributes = settings.DEDUPLICATION_ATTRIBUTES['static']
- deduplication_wso2_custom(new_finding, attributes)
+ deduplication_wso2_custom(new_finding, attributes)
+ else:
+ deduplicate_legacy(new_finding)
+ elif deduplicationAlgorithm == settings.DEDUPE_ALGO_WSO2_SCA_CUSTOM:
+ if hasattr(settings, 'DEDUPLICATION_ATTRIBUTES'):
+ attributes = settings.DEDUPLICATION_ATTRIBUTES['sca']
+ deduplication_wso2_sca_custom(new_finding, attributes)
else:
deduplicate_legacy(new_finding)
elif (deduplicationAlgorithm == settings.DEDUPE_ALGO_ATTRIBUTE_CONFIG) and hasattr(settings,
'DEDUPLICATION_ATTRIBUTES') and hasattr(
settings, 'DEDUPLICATION_ALLOWED_ATTRIBUTES'):
configured_attributes = settings.DEDUPLICATION_ATTRIBUTES
+
if (all(elem in settings.DEDUPLICATION_ALLOWED_ATTRIBUTES for elem in configured_attributes)):
deduplication_attr_config(new_finding, configured_attributes)
else:
@@ -540,11 +549,10 @@ def deduplication_wso2_custom(new_finding, attributes):
finding_filtered = finding_filtered.filter(**my_filter)

similar_findings_product = finding_filtered.filter(
- test__engagement__product=new_finding.test.engagement.product)
+ test__engagement__product=new_finding.test.engagement.product).exclude(test=new_finding.test)
similar_findings_product = list(similar_findings_product)
original_finding = get_original_finding(new_finding, attributes, similar_findings_product)

- print(original_finding.notes)
print("+++++++++++++++++++++++++++++++++++")
if original_finding is None:
product_name = new_finding.test.engagement.product.name
@@ -555,7 +563,7 @@ def deduplication_wso2_custom(new_finding, attributes):
print("+++++++++++++++++++++++++++++++++++ 2")
similar_findings_product_versions = finding_filtered.filter(
test__engagement__product__name__startswith=product_name).exclude(
- test__engagement__product=new_finding.test.engagement.product)
+ test__engagement__product=new_finding.test.engagement.product, test=new_finding.test)
print(similar_findings_product_versions)
print("+++++++++++++++++++++++++++++++++++ 3")
similar_findings_product_versions = list(similar_findings_product_versions)
@@ -584,58 +592,63 @@ def deduplication_wso2_custom(new_finding, attributes):
original_finding = None

if original_finding is not None:
- print("+++++++++++++++++++++++++++++++++++ 4")
- print(new_finding.notes.all())
- print(original_finding.notes.all())
- # print(new_finding.notes[0])
- # print(original_finding.notes[0])
notes = original_finding.notes.all()
new_notes = new_finding.notes.all()
- print("_______________________________________")
-
- note1 = notes.filter(note_type__name="Use Case")
- print(note1)
- note2 = notes.filter(note_type__name="Vulnerability Influence")
- print(note2)
- note3 = new_notes.filter(note_type__name="Use Case")
- print(note3)
- note4 = new_notes.filter(note_type__name="Vulnerability Influence")
- print(note4)
- print("_______________________________________")
- print(note1.values('entry'))
- print(note3.values('entry'))
- print(note2.values('entry'))
- print(note4.values('entry'))
-
-
- if not note3:
- print("NOTE 3 IS NULL")
- else:
- print(str(note3.values('entry')) == str(note1.values('entry')))
- if (str(note3.values('entry')) == str(note1.values('entry'))):
- print("_______________________________________ 1111")
- new_finding.notes.remove(note3[0])
- if not note4:
- print("NOTE 4 IS NULL")
+
+ new_finding.duplicate = True
+ new_finding.active = False
+ new_finding.verified = False
+ new_finding.duplicate_finding = original_finding
+ original_finding.duplicate_list.add(new_finding)
+ original_finding.found_by.add(new_finding.test.test_type)
+ super(Finding, new_finding).save()
+
+
+def deduplication_wso2_sca_custom(new_finding, attributes):
+ system_settings = System_Settings.objects.get()
+ if system_settings.enable_deduplication:
+
+ if not new_finding.duplicate:
+ finding_filtered = Finding.objects.all().exclude(id=new_finding.id)
+
+ for attr in attributes:
+ my_filter = {attr: getattr(new_finding, attr)}
+ finding_filtered = finding_filtered.filter(**my_filter)
+
+ similar_findings_product = finding_filtered.filter(
+ test__engagement__product=new_finding.test.engagement.product).exclude(test=new_finding.test)
+ similar_findings_product = list(similar_findings_product)
+
+ if similar_findings_product:
+ original_finding = sorted(similar_findings_product, key=lambda x: x.id, reverse=True)[0]
+ else:
+ original_finding = None
+
+ if original_finding is None:
+ product_name = new_finding.test.engagement.product.name
+ product_name = re.split('( \d+)', product_name)[0]
+
+ similar_findings_product_versions = finding_filtered.filter(
+ test__engagement__product__name__startswith=product_name).exclude(
+ test__engagement__product=new_finding.test.engagement.product, test=new_finding.test)
+ if similar_findings_product:
+ original_finding = sorted(similar_findings_product_versions, key=lambda x: x.id, reverse=True)[0]
else:
- print(str(note4.values('entry')) == str(note2.values('entry')))
- if (str(note4.values('entry')) == str(note2.values('entry'))):
- print("_______________________________________ 2222")
- new_finding.notes.remove(note4[0])
- # for note in notes:
- # print("________________________________________ 1")
- # print(note)
- # new_finding.notes.add(note)
- # if(note.note_type.name == "Use Case"):
- # Notes.
- # new_finding.notes.add()
- print("+++++++++++++++++++++++++++++++++++ 5")
- deduplicationLogger.debug('New finding ' + str(new_finding.id) + ' is a duplicate of existing finding ' + str(original_finding.id))
+ original_finding = None
+
+ if original_finding is None:
+ similar_findings_db = finding_filtered.exclude(test__engagement__product__name__startswith=product_name)
+ similar_findings_db = list(similar_findings_db)
+ if similar_findings_db:
+ original_finding = sorted(similar_findings_db, key=lambda x: x.id, reverse=True)[0]
+
+ if original_finding is not None:
+ deduplicationLogger.debug('New finding ' + str(new_finding.id) + ' is a duplicate of existing finding '
+ + str(original_finding.id))
new_finding.duplicate = True
new_finding.active = False
new_finding.verified = False
new_finding.duplicate_finding = original_finding
- # original_finding.duplicate_list.add(new_finding)
original_finding.found_by.add(new_finding.test.test_type)
super(Finding, new_finding).save()

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import csv
import hashlib
import os

import pandas as pd
from tastypie import bundle

from dojo.models import Finding, Notes, Note_Type, User


class TempParser(object):
# def __init__(self, filename, test):
# normalized_findings = self.normalize_findings(filename)
# self.ingest_findings(normalized_findings, test)

def __init__(self, filename, test):
dupes = dict()
use_case_notes_dict = dict()
vul_influence_notes_dict = dict()
wso2_resolution_notes_dict = dict()
resolution_notes_dict = dict()
self.items = ()
self.use_case_notes = ()
self.vul_influence_notes = ()
self.wso2_resolution_notes = ()
self.resolution_notes = ()

if filename is None:
self.items = ()
return

df = pd.read_csv(filename, header=0)

for i, row in df.iterrows():
cwe = df.loc[i, 'cwe']
title = df.loc[i, 'title']
description = df.loc[i, 'description']
sev = df.loc[i, 'severity']
line = df.loc[i, 'line_number']
issue_id = df.loc[i, 'issue_id']
use_case_note = df.loc[i, 'Use_Case']
vul_influence_note = df.loc[i, 'Vulnerability_Influence']
resolution_note = df.loc[i, 'Resolution']
sourcefilepath = df.loc[i, 'sourcefilepath']
sourcefile = df.loc[i, 'sourcefile']
mitigation = df.loc[i, 'mitigation']
impact = df.loc[i, 'impact']
WSO2_resolution = df.loc[i, 'WSO2_resolution']

dupe_key = sev + str(cwe) + str(line) + str(sourcefile) + str(sourcefilepath) + str(title) + str(issue_id)

if dupe_key in dupes:
finding = dupes[dupe_key]
if finding.description:
finding.description = finding.description + "\nVulnerability ID: " + \
df.loc[i, 'mitigation']
# self.process_endpoints(finding, df, i)
dupes[dupe_key] = finding
else:
dupes[dupe_key] = True

finding = Finding(title=title,
cwe=int(cwe),
test=test,
active=False,
verified=False,
severity=sev,
static_finding=True,
line_number=line,
file_path=sourcefilepath+sourcefile,
line=line,
sourcefile=sourcefile,
description=description,
numerical_severity=Finding.
get_numerical_severity(sev),
mitigation=mitigation,
impact=impact,
url='N/A')

use_case_note = Notes(entry=use_case_note, note_type=Note_Type(id=2), author=User.objects.all().first())
vul_influence_note = Notes(entry=vul_influence_note, note_type=Note_Type(id=3), author=User.objects.all().first())
wso2_resolution_note = Notes(entry=WSO2_resolution, note_type=Note_Type(id=1), author=User.objects.all().first())
resolution_note = Notes(entry=resolution_note, note_type=Note_Type(id=4), author=User.objects.all().first())
use_case_note.save()
vul_influence_note.save()
wso2_resolution_note.save()
resolution_note.save()

dupes[dupe_key] = finding
use_case_notes_dict[dupe_key] = use_case_note
vul_influence_notes_dict[dupe_key] = vul_influence_note
wso2_resolution_notes_dict[dupe_key] = wso2_resolution_note
resolution_notes_dict[dupe_key] = resolution_note

self.items = list(dupes.values())
self.use_case_notes = list(use_case_notes_dict.values())
self.vul_influence_notes = list(vul_influence_notes_dict.values())
self.wso2_resolution_notes = list(wso2_resolution_notes_dict.values())
self.resolution_notes = list(resolution_notes_dict.values())