From 79decf8d2a9e87a1e73ead8a3936c7f9016c26dc Mon Sep 17 00:00:00 2001 From: chenailin11 Date: Mon, 19 Jan 2026 14:28:13 +0800 Subject: [PATCH] Add Parse for LinkState TLV Types 263 --- yabgp/message/attribute/linkstate/__init__.py | 1 + .../message/attribute/linkstate/node/mt_id.py | 51 ++++++ yabgp/message/attribute/nlri/linkstate.py | 9 +- .../attribute/linkstate/node/test_mt_id.py | 167 ++++++++++++++++++ 4 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 yabgp/message/attribute/linkstate/node/mt_id.py create mode 100644 yabgp/tests/unit/message/attribute/linkstate/node/test_mt_id.py diff --git a/yabgp/message/attribute/linkstate/__init__.py b/yabgp/message/attribute/linkstate/__init__.py index 1dc118f..3ce4853 100644 --- a/yabgp/message/attribute/linkstate/__init__.py +++ b/yabgp/message/attribute/linkstate/__init__.py @@ -15,6 +15,7 @@ from .linkstate import LinkState # noqa from .node.local_router_id import LocalRouterID # noqa +from .node.mt_id import MultiTopologyIdentifier # noqa from .node.name import NodeName # noqa from .node.flex_algo_define import FlexAlgorithmDefine # noqa from .node.isisarea import ISISArea # noqa diff --git a/yabgp/message/attribute/linkstate/node/mt_id.py b/yabgp/message/attribute/linkstate/node/mt_id.py new file mode 100644 index 0000000..10198c6 --- /dev/null +++ b/yabgp/message/attribute/linkstate/node/mt_id.py @@ -0,0 +1,51 @@ +# Copyright 2025 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import struct +from yabgp.tlv import TLV +from ..linkstate import LinkState + + +@LinkState.register() +class MultiTopologyIdentifier(TLV): + """MultiTopologyIdentifier TLV (Type 263) + + RFC 7752 Section 3.2.1.5: + https://datatracker.ietf.org/doc/html/rfc7752#section-3.2.1.5 + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type | Length=2*n | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |R R R R| Multi-Topology ID 1 | .... // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // .... |R R R R| Multi-Topology ID n | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + The MT-ID field is a 12-bit field (with 4 reserved bits). + Multiple MT-IDs can be present in the TLV (each 2 bytes). + """ + TYPE = 263 + TYPE_STR = 'mt_id' + + @classmethod + def unpack(cls, data): + mt_id = [] + while data: + mt_id.append(struct.unpack('!H', data[:2])[0]) + data = data[2:] + return cls(value=mt_id) diff --git a/yabgp/message/attribute/nlri/linkstate.py b/yabgp/message/attribute/nlri/linkstate.py index 3081cfd..c394c70 100644 --- a/yabgp/message/attribute/nlri/linkstate.py +++ b/yabgp/message/attribute/nlri/linkstate.py @@ -21,6 +21,7 @@ import netaddr +from yabgp.message.attribute.linkstate.node.mt_id import MultiTopologyIdentifier from yabgp.message.attribute.nlri import NLRI @@ -144,12 +145,8 @@ def parse_nlri(cls, data, nlri_type): ipv6_neighbor_addr = str(netaddr.IPAddress(int(binascii.b2a_hex(value), 16))) descriptor['type'] = 'link_remote_ipv6' descriptor['value'] = ipv6_neighbor_addr - elif _type == 263: # Multi-Topology Identifier - descriptor['type'] = 'mt_id' - descriptor['value'] = [] - while value: - descriptor['value'].append(struct.unpack('!H', value[:2])[0]) - value = value[2:] + elif _type == MultiTopologyIdentifier.TYPE: # Multi-Topology Identifier + descriptor = MultiTopologyIdentifier.unpack(value).dict() elif _type == 264: # OSPF Route Type descriptor['type'] = 'prefix_ospf_route_type' descriptor['value'] = ord(value[0:1]) diff --git a/yabgp/tests/unit/message/attribute/linkstate/node/test_mt_id.py b/yabgp/tests/unit/message/attribute/linkstate/node/test_mt_id.py new file mode 100644 index 0000000..5c8dcb7 --- /dev/null +++ b/yabgp/tests/unit/message/attribute/linkstate/node/test_mt_id.py @@ -0,0 +1,167 @@ +# Copyright 2025 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" Test Multi-Topology Identifier TLV """ + +import unittest + +from yabgp.message.attribute.linkstate.node.mt_id import MultiTopologyIdentifier + + +class TestMultiTopologyIdentifier(unittest.TestCase): + """Test MultiTopologyIdentifier TLV (Type 263) + """ + + def test_unpack_single_mt_id(self): + """Test unpack with single MT-ID + + Data: 00 02 + - Reserved: 0x0 + - MT-ID: 0x002 = 2 (IPv6 unicast) + """ + data_bin = bytes.fromhex('0002') + + expected = { + 'type': 'mt_id', + 'value': [2] + } + + self.assertEqual(expected, MultiTopologyIdentifier.unpack(data_bin).dict()) + + def test_unpack_multiple_mt_ids(self): + """Test unpack with multiple MT-IDs + + Data: 00 00 00 02 00 03 + - MT-ID 1: 0x0000 = 0 (default topology) + - MT-ID 2: 0x0002 = 2 (IPv6 unicast) + - MT-ID 3: 0x0003 = 3 (IPv4 multicast) + """ + data_bin = bytes.fromhex('000000020003') + + expected = { + 'type': 'mt_id', + 'value': [0, 2, 3] + } + + self.assertEqual(expected, MultiTopologyIdentifier.unpack(data_bin).dict()) + + def test_unpack_empty_data(self): + """Test unpack with empty data + + Data: (empty) + Expected: Empty MT-ID list + """ + data_bin = b'' + + expected = { + 'type': 'mt_id', + 'value': [] + } + + self.assertEqual(expected, MultiTopologyIdentifier.unpack(data_bin).dict()) + + def test_unpack_default_topology(self): + """Test unpack with default topology (MT-ID = 0) + + Data: 00 00 + - MT-ID: 0x0000 = 0 (default topology) + """ + data_bin = bytes.fromhex('0000') + + expected = { + 'type': 'mt_id', + 'value': [0] + } + + self.assertEqual(expected, MultiTopologyIdentifier.unpack(data_bin).dict()) + + def test_unpack_max_mt_id(self): + """Test unpack with maximum MT-ID value (0xFFFF) + + Data: ff ff + - MT-ID: 0xFFFF = 65535 (maximum value) + """ + data_bin = bytes.fromhex('ffff') + + expected = { + 'type': 'mt_id', + 'value': [65535] + } + + self.assertEqual(expected, MultiTopologyIdentifier.unpack(data_bin).dict()) + + def test_unpack_common_mt_ids(self): + """Test unpack with common MT-ID values + + Common MT-IDs (RFC 5120): + - 0: Default topology + - 2: IPv6 unicast topology + - 3: IPv4 multicast topology + - 4: IPv6 multicast topology + + Data: 00 00 00 02 00 03 00 04 + """ + data_bin = bytes.fromhex('0000000200030004') + + expected = { + 'type': 'mt_id', + 'value': [0, 2, 3, 4] + } + + self.assertEqual(expected, MultiTopologyIdentifier.unpack(data_bin).dict()) + + # ==================== Exception/Abnormal Packet Tests ==================== + + def test_unpack_single_byte(self): + """Test unpack with single byte (incomplete MT-ID) + + Data: 00 (only 1 byte, MT-ID requires 2 bytes) + Expected: struct.error + """ + data_bin = bytes.fromhex('00') + + with self.assertRaises(Exception): + MultiTopologyIdentifier.unpack(data_bin) + + def test_unpack_odd_bytes_3(self): + """Test unpack with odd number of bytes (3 bytes) + + Data: 00 02 00 (3 bytes, last byte is incomplete) + - MT-ID 1: 0x0002 = 2 (parsed successfully) + - Remaining: 0x00 (incomplete, only 1 byte) + Expected: struct.error on second iteration + """ + data_bin = bytes.fromhex('000200') + + with self.assertRaises(Exception): + MultiTopologyIdentifier.unpack(data_bin) + + def test_unpack_odd_bytes_5(self): + """Test unpack with odd number of bytes (5 bytes) + + Data: 00 02 00 03 00 (5 bytes) + - MT-ID 1: 0x0002 = 2 + - MT-ID 2: 0x0003 = 3 + - Remaining: 0x00 (incomplete) + Expected: struct.error on third iteration + """ + data_bin = bytes.fromhex('0002000300') + + with self.assertRaises(Exception): + MultiTopologyIdentifier.unpack(data_bin) + + +if __name__ == "__main__": + unittest.main()