Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
'deepmerge==1.1.0',
'jmespath==1.0.1',
'python-hcl2==4.3.2',
'requests==2.32.3',
'requests==2.32.4',
'fastapi>=0.115.2,<0.116.0',
"python-multipart==0.0.19",
'click==8.1.7',
Expand Down
15 changes: 8 additions & 7 deletions sl_util/sl_util/iterations_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# This weird while needs to be done because:
# - The foreach does not work if you remove an element from the list
# - The remove has to be invoked on the list to affect the object passed as a class attribute
from typing import List, Union
from typing import Union


def remove_from_list(collection: List,
def remove_from_list(collection: list,
filter_function,
remove_function=None) -> None:
if collection is None:
Expand All @@ -18,7 +18,7 @@ def remove_from_list(collection: List,
i += 1


def remove_duplicates(duplicated_list: List) -> List:
def remove_duplicates(duplicated_list: list) -> list:
unique_list = []

for element in duplicated_list:
Expand All @@ -28,7 +28,7 @@ def remove_duplicates(duplicated_list: List) -> List:
return unique_list


def compare_unordered_list_or_string(a: Union[str, List], b: Union[str, List]) -> bool:
def compare_unordered_list_or_string(a: Union[str, list], b: Union[str, list]) -> bool:
try:
if isinstance(a, str) and isinstance(b, str):
return a == b
Expand All @@ -40,20 +40,21 @@ def compare_unordered_list_or_string(a: Union[str, List], b: Union[str, List]) -
return sorted(a) == sorted(b)
except TypeError:
return False
return False


def remove_nones(list: List) -> List:
def remove_nones(list: list) -> list:
return [e for e in list if e != None]


def remove_keys(dictionary: dict, keys_to_remove: [str]) -> dict:
def remove_keys(dictionary: dict, keys_to_remove: list[str]) -> dict:
filtered = dictionary.copy()
for key_to_remove in keys_to_remove:
if key_to_remove in filtered:
filtered.pop(key_to_remove)
return filtered


def append_if_not_exists(element, _list: List):
def append_if_not_exists(element, _list: list):
if element not in _list:
_list.append(element)
4 changes: 2 additions & 2 deletions slp_visio/slp_visio/load/objects/diagram_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ def __str__(self) -> str:
class Diagram:
def __init__(self,
diagram_type: DiagramType,
components: [DiagramComponent],
connectors: [DiagramConnector],
components: list[DiagramComponent],
connectors: list[DiagramConnector],
limits: DiagramLimits = None):
self.diagram_type = diagram_type
self.components = components
Expand Down
71 changes: 24 additions & 47 deletions slp_visio/slp_visio/parse/lucid_parser.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
from typing import Union, List
from typing import Optional

from sl_util.sl_util import secure_regex
from sl_util.sl_util.iterations_utils import remove_keys
from slp_visio.slp_visio.load.objects.diagram_objects import DiagramComponent, Diagram
from slp_visio.slp_visio.load.objects.diagram_objects import Diagram
from slp_visio.slp_visio.load.visio_mapping_loader import VisioMappingFileLoader
from slp_visio.slp_visio.parse.visio_parser import VisioParser, _match_resource

AWS_REGEX = [r".*2017$", r".*AWS19$", r".*AWS2021$"]
AZURE_REGEX = [r"^AC.*Block$", r"^AE.*Block$", r"^AGS.*Block$", r"^AVM.*Block$", r".*Azure2019$", r".*Azure2021$"]
LUCID_CATCH_ALL_REGEX = AWS_REGEX + AZURE_REGEX


def _get_diagram_component_mapping_by_catch_all(resource: DiagramComponent, catch_all_config: [str]) \
-> Union[None, dict]:
for regex in LUCID_CATCH_ALL_REGEX:
if secure_regex.match(regex, resource.type):
return {'label': resource.type, 'type': catch_all_config}


class LucidParser(VisioParser):

def __init__(self, project_id: str, project_name: str, diagram: Diagram, mapping_loader: VisioMappingFileLoader):
super().__init__(project_id, project_name, diagram, mapping_loader)

def _get_component_mappings(self) -> [dict]:
def _get_component_mappings(self) -> dict:
"""
Returns the component mappings.
After the component mappings are determined, the catch all mappings is determined.
Expand All @@ -36,39 +23,29 @@ def _get_component_mappings(self) -> [dict]:

return self.__prune_skip_components({**catch_all_components, **component_mappings})

def __get_catch_all_mappings(self, ids_to_skip) -> [dict]:
result = {}
catch_all_config = self.__get_catch_all_config()
if not catch_all_config:
return result
for diag_component in self.diagram.components:
if diag_component.id in ids_to_skip:
continue
mapping = _get_diagram_component_mapping_by_catch_all(diag_component, catch_all_config)
if mapping:
result[diag_component.id] = mapping
return result

def __get_catch_all_config(self):
catch_all = self.mapping_loader.configuration.get('catch_all', False)
if not catch_all or catch_all.lower() == 'false':
return

return catch_all.strip()
def __get_catch_all_mappings(self, ids_to_skip) -> dict:
catch_all_type = self.__get_catch_all_type()
return {
c.id: {'label': c.type, 'type': catch_all_type}
for c in self.diagram.components
if c.id not in ids_to_skip
} if catch_all_type else {}

def __get_skip_config(self) -> List[str]:
return self.mapping_loader.configuration.get('skip')
def __get_catch_all_type(self) -> Optional[str]:
catch_all = self.mapping_loader.configuration.get('catch_all')
return catch_all.strip() if catch_all and catch_all.lower() != 'false' else None

def __prune_skip_components(self, mappings: dict) -> dict:
ids_to_skip = self.__get_ids_to_skip()
return remove_keys(mappings, ids_to_skip)
return remove_keys(mappings, self.__get_ids_to_skip())

def __get_ids_to_skip(self) -> List[str]:
ids_to_skip = []
def __get_ids_to_skip(self) -> list[str]:
skip_config = self.__get_skip_config()
if skip_config:
for component in self.diagram.components:
for skip in skip_config:
if _match_resource(component.type, skip):
ids_to_skip.append(component.id)
return ids_to_skip
return [
component.id
for component in self.diagram.components
for skip in skip_config
if _match_resource(component.type, skip)
] if skip_config else []

def __get_skip_config(self) -> list[str]:
return self.mapping_loader.configuration.get('skip')
20 changes: 11 additions & 9 deletions slp_visio/slp_visio/parse/mappers/diagram_component_mapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Dict

from otm.otm.entity.component import Component
from otm.otm.entity.trustzone import Trustzone
from slp_base import MappingFileNotValidError
Expand All @@ -8,25 +6,29 @@
from slp_visio.slp_visio.parse.representation.representation_calculator import RepresentationCalculator


def _normalize_component_name(name: str) -> str:
return f'_{name}' if len(name) == 1 else name


class DiagramComponentMapper(DiagramMapper):

def __init__(self,
components: [DiagramComponent],
component_mappings: Dict[str, dict],
trustzone_mappings: Dict[str, dict],
components: list[DiagramComponent],
component_mappings: dict[str, dict],
trustzone_mappings: dict[str, dict],
default_trustzone: Trustzone,
representation_calculator: RepresentationCalculator):
self.components: [DiagramComponent] = components
self.components: list[DiagramComponent] = components
self.component_mappings = component_mappings
self.trustzone_mappings = trustzone_mappings
self.default_trustzone = default_trustzone

self.representation_calculator = representation_calculator

def to_otm(self) -> [Component]:
def to_otm(self) -> list[Component]:
return self.__map_to_otm(self.components)

def __map_to_otm(self, components: [DiagramComponent]) -> [Component]:
def __map_to_otm(self, components: list[DiagramComponent]) -> list[Component]:
otm_components = []

for diag_component in components:
Expand All @@ -43,7 +45,7 @@ def __build_otm_component(self, diagram_component: DiagramComponent, otm_type: s

return Component(
component_id=diagram_component.id,
name=diagram_component.name,
name=_normalize_component_name(diagram_component.name),
component_type=otm_type,
parent=self.__calculate_parent_id(diagram_component),
parent_type=self._calculate_parent_type(diagram_component),
Expand Down
27 changes: 14 additions & 13 deletions slp_visio/tests/unit/parse/mappers/test_diagram_component_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

import pytest

from slp_visio.slp_visio.load.objects.diagram_objects import DiagramComponent

from slp_base import MappingFileNotValidError
from slp_visio.slp_visio.parse.mappers.diagram_component_mapper import DiagramComponentMapper

tz1 = MagicMock(id='tz1')
c1 = MagicMock(id='c1')
tz1 = DiagramComponent(id='tz1')
c1 = DiagramComponent(id='c1', name='Component 1')
c1.parent = tz1
c2 = MagicMock(id='c2')
c2 = DiagramComponent(id='c2', name='Component 2')
c2.parent = c1
c3 = MagicMock(id='c3')
c3 = DiagramComponent(id='c3', name='3')
c3.parent = None
c4 = MagicMock(id='c4')
c4 = DiagramComponent(id='c4', name='Component 4')


diagram_components = [tz1, c1, c2, c3, c4]
Expand Down Expand Up @@ -53,17 +55,20 @@ def test_to_otm(self):
assert components[0].id == 'c1'
assert components[0].type == 'type-1'
assert components[0].parent == 'tz1'
assert components[0].name == c1.name

assert components[1].id == 'c2'
assert components[1].type == 'type-2'
assert components[1].parent == 'c1'
assert components[1].name == c2.name

assert components[2].id == 'c3'
assert components[2].type == 'type-3'
assert components[2].parent == default_trustzone.id
assert components[2].name == f"_{c3.name}"

def test_not_default_trustzone(self):
# GIVEN the diagram component mapper without default trustzone
# GIVEN the diagram component mapper without a default trustzone
diagram_component_mapper = DiagramComponentMapper(
diagram_components,
component_mappings,
Expand All @@ -72,15 +77,11 @@ def test_not_default_trustzone(self):
representation_calculator
)

# WHEN to_otm is called an exception is raised
# WHEN to_otm is called, expect an exception
with pytest.raises(MappingFileNotValidError) as error:
diagram_component_mapper.to_otm()

# THEN the exception is raised
# THEN an exception is raised
assert error.value.title == 'Mapping files are not valid'
assert error.value.detail == 'No default trust zone has been defined in the mapping file'
assert error.value.message == 'Please, add a default trust zone'




assert error.value.message == 'Please, add a default trust zone'
15 changes: 5 additions & 10 deletions slp_visio/tests/unit/parse/test_lucid_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,12 @@ def test_none_catch_all_config(self, catch_all_config):
assert len(component_mappings) == 0

@pytest.mark.parametrize('shape_type', [
pytest.param("AmazonEC22017", id='2017'),
pytest.param("DatabaseAWS19", id='AWS19'),
pytest.param("AWSCloudAWS2021", id='AWS2021$'),
pytest.param("ACAccessControlBlock", id='AC.*Block'),
pytest.param("AEAndroidPhoneBlock", id='AE.*Block'),
pytest.param("AGSUserBlock", id='AGS.*Block'),
pytest.param("AVMActiveDirectoryVMBlock", id='AVM.*Block'),
pytest.param("AzureDatabaseforPostgreSQLServersAzure2019", id='Azure2019'),
pytest.param("WebApplicationFirewallPoliciesWAFAzure2021", id='Azure2021$'),
pytest.param("AmazonEC2", id='aws shape'),
pytest.param("AzureCloud", id='azure shape'),
pytest.param("Database", id='infrastructure shape'),
pytest.param("GenericShape", id='generic shape'),
])
def test_catch_all_by_regex(self, shape_type):
def test_catch_all_applies_to_all_shapes(self, shape_type):
# GIVEN a mapping loader with catch_all configuration
mapping_loader = MagicMock(configuration={'catch_all': 'empty-component'})
# AND a diagram with a shape of the given type
Expand Down