From 80a4db64d981a8bba685112334f1b8a06ea9dc34 Mon Sep 17 00:00:00 2001 From: Changjie Gao Date: Wed, 11 Jun 2025 10:14:30 +0800 Subject: [PATCH 1/3] Add local_add_path parameter --- yabgp/config.py | 13 +- yabgp/core/protocol.py | 36 +- yabgp/message/open.py | 25 +- .../unit/message/attribute/test_add_path.py | 328 ++++++++++++++++++ yabgp/tests/unit/message/test_open.py | 2 +- 5 files changed, 379 insertions(+), 25 deletions(-) create mode 100644 yabgp/tests/unit/message/attribute/test_add_path.py diff --git a/yabgp/config.py b/yabgp/config.py index d198367..09c0dd9 100644 --- a/yabgp/config.py +++ b/yabgp/config.py @@ -41,9 +41,6 @@ cfg.BoolOpt('enhanced_route_refresh', default=True, help='If support enhanced route refresh'), - cfg.StrOpt('add_path', - choices=['ipv4_send', 'ipv4_receive', 'ipv4_both'], - help='BGP additional path feature and supported address family'), cfg.BoolOpt('graceful_restart', default=True, help='if support graceful restart'), @@ -80,7 +77,10 @@ 'rib', default=False, help='maintain rib in or not, default is False' - ) + ), + cfg.ListOpt('local_add_path', + default=['ipv4_both'], + help='The Local BGP add path configuration') ] CONF.register_cli_opts(BGP_PEER_CONFIG_OPTS, group='bgp') @@ -134,7 +134,10 @@ def get_bgp_config(): 'enhanced_route_refresh': CONF.bgp.enhanced_route_refresh, 'graceful_restart': CONF.bgp.graceful_restart, 'cisco_multi_session': CONF.bgp.cisco_multi_session, - 'add_path': CONF.bgp.add_path + 'add_path': [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in CONF.bgp.local_add_path + ] }, 'remote': {} } diff --git a/yabgp/core/protocol.py b/yabgp/core/protocol.py index 0af071a..a7b5a7e 100644 --- a/yabgp/core/protocol.py +++ b/yabgp/core/protocol.py @@ -58,6 +58,7 @@ def __init__(self): self.adj_rib_in = {k: {} for k in CONF.bgp.afi_safi} self.adj_rib_out = {k: {} for k in CONF.bgp.afi_safi} self.adj_rib_in_ipv4_tree = Radix() + self.afi_add_path = None # statistic self.msg_sent_stat = { @@ -263,7 +264,7 @@ def _update_received(self, timestamp, msg): """Called when a BGP Update message was received.""" # TODO: Need to convert `self.add_path_ipv4_receive` and `self.add_path_ipv4_send` into a unified # `afi_add_path` format. - result = Update().parse(timestamp, msg, self.fourbytesas, afi_add_path={}) + result = Update().parse(timestamp, msg, self.fourbytesas, afi_add_path=self.afi_add_path) if result['sub_error']: msg = { 'attr': result['attr'], @@ -470,11 +471,9 @@ def send_open(self): # check add path feature, send add path condition: # local support send or both # remote support receive or both - if cfg.CONF.bgp.running_config['capability']['local']['add_path'] in \ - ['ipv4_send', 'ipv4_both']: - if cfg.CONF.bgp.running_config['capability']['remote'].get('add_path') in \ - ['ipv4_receive', 'ipv4_both']: - self.add_path_ipv4_send = True + local_add_path = cfg.CONF.bgp.running_config['capability']['local']['add_path'] + remote_add_path = cfg.CONF.bgp.running_config['capability']['remote'].get('add_path') + self.afi_add_path = self.compare_add_path(local_add_path, remote_add_path) # send message self.transport.write(open_msg) self.msg_sent_stat['Opens'] += 1 @@ -519,11 +518,9 @@ def _open_received(self, timestamp, msg): if key == 'four_bytes_as': self.fourbytesas = True elif key == 'add_path': - if cfg.CONF.bgp.running_config['capability']['remote']['add_path'] in \ - ['ipv4_send', 'ipv4_both']: - if cfg.CONF.bgp.running_config['capability']['local']['add_path'] in \ - ['ipv4_receive', 'ipv4_both']: - self.add_path_ipv4_receive = True + self.afi_add_path = self.compare_add_path( + cfg.CONF.bgp.running_config['capability']['local']['add_path'], + cfg.CONF.bgp.running_config['capability']['remote'].get('add_path')) LOG.info("--%s = %s", key, cfg.CONF.bgp.running_config['capability']['remote'][key]) @@ -853,3 +850,20 @@ def update_receive_verion(self, attr, nlri, withdraw): del self.mpls_vpn_receive_dict[str(key)] else: LOG.info("Do not have %s in receive mpls_vpn dict" % key) + + def compare_add_path(self, local_add_path, remote_add_path): + if not local_add_path or not remote_add_path: + return None + + afi_add_path = {} + for local in local_add_path: + for remote in remote_add_path: + if local.get('afi_safi') == remote.get('afi_safi'): + local_send_receive = local.get('send/receive') + remote_send_receive = remote.get('send/receive') + if local_send_receive in ['receive', 'both'] and remote_send_receive in ['send', 'both']: + afi_add_path[local.get('afi_safi')] = True + else: + afi_add_path[local.get('afi_safi')] = False + + return afi_add_path diff --git a/yabgp/message/open.py b/yabgp/message/open.py index 8c661f2..c2f07e4 100644 --- a/yabgp/message/open.py +++ b/yabgp/message/open.py @@ -20,6 +20,7 @@ from yabgp.common import exception as excp from yabgp.common import constants as bgp_cons +from yabgp.common.constants import AFI_SAFI_STR_DICT class Open(object): @@ -420,9 +421,13 @@ def construct(self, my_capability=None): return afisafi # for add path elif self.capa_code == self.ADD_PATH: - afi_safi, value = convert_addpath_str_to_int(self.capa_value) + addpath_int_list = convert_addpath_str_to_int(self.capa_value) + length = len(addpath_int_list) * 4 add_path = struct.pack( - '!BBBBHBB', 2, 6, self.ADD_PATH, self.capa_length, afi_safi[0], afi_safi[1], value) + '!BBBB', 2, length + 1 + 1, self.ADD_PATH, length) + for (afi_safi, value) in addpath_int_list: + afi, safi = afi_safi + add_path += struct.pack('!HBB', afi, safi, value) return add_path # (10) Extended Next Hop Encoding Capability @@ -445,10 +450,14 @@ def construct(self, my_capability=None): return struct.pack('!BBBB', 2, 2, self.ENHANCED_ROUTE_REFRESH, 0) -def convert_addpath_str_to_int(addpath_str): - addpath_dict = { - 'ipv4_receive': [(1, 1), 1], - 'ipv4_send': [(1, 1), 2], - 'ipv4_both': [(1, 1), 3] +def convert_addpath_str_to_int(addpath_list): + addpath_int_list = [] + send_receive_dict = { + 'receive': 1, + 'send': 2, + 'both': 3 } - return addpath_dict[addpath_str] + for addpath in addpath_list: + addpath_int_list.append([AFI_SAFI_STR_DICT[addpath['afi_safi']], send_receive_dict[addpath['send/receive']]]) + + return addpath_int_list diff --git a/yabgp/tests/unit/message/attribute/test_add_path.py b/yabgp/tests/unit/message/attribute/test_add_path.py new file mode 100644 index 0000000..4abdea4 --- /dev/null +++ b/yabgp/tests/unit/message/attribute/test_add_path.py @@ -0,0 +1,328 @@ +# Copyright 2015 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Test Add Path Capability""" + +import unittest +from yabgp.message.open import convert_addpath_str_to_int + + +class TestAddPathCapability(unittest.TestCase): + """ + Test cases for Add Path capability introduced in commits 1a17665 and 814598b. + These tests cover: + 1. convert_addpath_str_to_int function + 2. Add path capability parsing and construction + 3. Add path negotiation logic + """ + + def setUp(self): + """Set up test fixtures""" + self.maxDiff = None + + def test_convert_addpath_str_to_int_single_receive(self): + """ + Test converting a single AFI/SAFI in receive mode + """ + addpath_list = [ + {'afi_safi': 'ipv4', 'send/receive': 'receive'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(1, 1), 1]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_single_send(self): + """ + Test converting a single AFI/SAFI in send mode + """ + addpath_list = [ + {'afi_safi': 'ipv4', 'send/receive': 'send'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(1, 1), 2]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_single_both(self): + """ + Test converting a single AFI/SAFI in both mode + """ + addpath_list = [ + {'afi_safi': 'ipv4', 'send/receive': 'both'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(1, 1), 3]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_multiple_afi_safi(self): + """ + Test converting multiple AFI/SAFI entries + """ + addpath_list = [ + {'afi_safi': 'ipv4', 'send/receive': 'both'}, + {'afi_safi': 'ipv6', 'send/receive': 'receive'}, + {'afi_safi': 'vpnv4', 'send/receive': 'send'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [ + [(1, 1), 3], # ipv4, both + [(2, 1), 1], # ipv6, receive + [(1, 128), 2] # vpnv4, send + ] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_ipv4_lu(self): + """ + Test converting IPv4 Labeled Unicast + """ + addpath_list = [ + {'afi_safi': 'ipv4_lu', 'send/receive': 'both'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(1, 4), 3]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_ipv6_lu(self): + """ + Test converting IPv6 Labeled Unicast + """ + addpath_list = [ + {'afi_safi': 'ipv6_lu', 'send/receive': 'receive'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(2, 4), 1]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_vpnv6(self): + """ + Test converting VPNv6 + """ + addpath_list = [ + {'afi_safi': 'vpnv6', 'send/receive': 'both'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(2, 128), 3]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_flowspec(self): + """ + Test converting Flowspec + """ + addpath_list = [ + {'afi_safi': 'flowspec', 'send/receive': 'send'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(1, 133), 2]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_empty_list(self): + """ + Test converting empty list + """ + addpath_list = [] + result = convert_addpath_str_to_int(addpath_list) + expected = [] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_mixed_families(self): + """ + Test converting mixed address families (IPv4, IPv6, VPN, Labeled Unicast) + """ + addpath_list = [ + {'afi_safi': 'ipv4', 'send/receive': 'both'}, + {'afi_safi': 'ipv6', 'send/receive': 'both'}, + {'afi_safi': 'ipv4_lu', 'send/receive': 'receive'}, + {'afi_safi': 'ipv6_lu', 'send/receive': 'receive'}, + {'afi_safi': 'vpnv4', 'send/receive': 'send'}, + {'afi_safi': 'vpnv6', 'send/receive': 'send'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [ + [(1, 1), 3], # ipv4, both + [(2, 1), 3], # ipv6, both + [(1, 4), 1], # ipv4_lu, receive + [(2, 4), 1], # ipv6_lu, receive + [(1, 128), 2], # vpnv4, send + [(2, 128), 2] # vpnv6, send + ] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_evpn(self): + """ + Test converting EVPN + """ + addpath_list = [ + {'afi_safi': 'evpn', 'send/receive': 'both'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(25, 70), 3]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_bgpls(self): + """ + Test converting BGP-LS + """ + addpath_list = [ + {'afi_safi': 'bgpls', 'send/receive': 'receive'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(16388, 71), 1]] + self.assertEqual(expected, result) + + def test_convert_addpath_str_to_int_sr_policy(self): + """ + Test converting SR Policy + """ + addpath_list = [ + {'afi_safi': 'ipv4_srte', 'send/receive': 'both'} + ] + result = convert_addpath_str_to_int(addpath_list) + expected = [[(1, 73), 3]] + self.assertEqual(expected, result) + + +class TestLocalAddPathConfig(unittest.TestCase): + """ + Test cases for local_add_path configuration processing in config.py + Tests the list comprehension that converts local_add_path strings to dictionaries + """ + + def test_local_add_path_single_ipv4_both(self): + """ + Test processing single IPv4 both mode configuration + """ + # Simulate the list comprehension from config.py line 137-139 + local_add_path_config = ['ipv4_both'] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4', 'send/receive': 'both'} + ] + self.assertEqual(expected, result) + + def test_local_add_path_single_ipv4_receive(self): + """ + Test processing single IPv4 receive mode configuration + """ + local_add_path_config = ['ipv4_receive'] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4', 'send/receive': 'receive'} + ] + self.assertEqual(expected, result) + + def test_local_add_path_single_ipv4_send(self): + """ + Test processing single IPv4 send mode configuration + """ + local_add_path_config = ['ipv4_send'] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4', 'send/receive': 'send'} + ] + self.assertEqual(expected, result) + + def test_local_add_path_multiple_families(self): + """ + Test processing multiple address families configuration + """ + local_add_path_config = ['ipv4_both', 'ipv6_receive', 'vpnv4_send'] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4', 'send/receive': 'both'}, + {'afi_safi': 'ipv6', 'send/receive': 'receive'}, + {'afi_safi': 'vpnv4', 'send/receive': 'send'} + ] + self.assertEqual(expected, result) + + def test_local_add_path_labeled_unicast(self): + """ + Test processing Labeled Unicast address family configuration + Note: ipv4_lu has two underscores, rsplit('_', 1) only splits the last one + """ + local_add_path_config = ['ipv4_lu_both', 'ipv6_lu_receive'] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4_lu', 'send/receive': 'both'}, + {'afi_safi': 'ipv6_lu', 'send/receive': 'receive'} + ] + self.assertEqual(expected, result) + + def test_local_add_path_empty_list(self): + """ + Test processing empty configuration list + """ + local_add_path_config = [] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [] + self.assertEqual(expected, result) + + def test_local_add_path_all_modes(self): + """ + Test all modes (send, receive, both) + """ + local_add_path_config = ['ipv4_send', 'ipv6_receive', 'vpnv4_both'] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4', 'send/receive': 'send'}, + {'afi_safi': 'ipv6', 'send/receive': 'receive'}, + {'afi_safi': 'vpnv4', 'send/receive': 'both'} + ] + self.assertEqual(expected, result) + + def test_local_add_path_complex_afi_safi_names(self): + """ + Test complex address family names (containing multiple underscores) + """ + local_add_path_config = [ + 'ipv4_lu_both', + 'ipv6_lu_receive', + 'ipv4_srte_send', + 'ipv6_flowspec_both' + ] + result = [ + {'afi_safi': item.rsplit('_', 1)[0], 'send/receive': item.rsplit('_', 1)[1]} + for item in local_add_path_config + ] + expected = [ + {'afi_safi': 'ipv4_lu', 'send/receive': 'both'}, + {'afi_safi': 'ipv6_lu', 'send/receive': 'receive'}, + {'afi_safi': 'ipv4_srte', 'send/receive': 'send'}, + {'afi_safi': 'ipv6_flowspec', 'send/receive': 'both'} + ] + self.assertEqual(expected, result) + + +if __name__ == '__main__': + unittest.main() diff --git a/yabgp/tests/unit/message/test_open.py b/yabgp/tests/unit/message/test_open.py index fec3f6a..85585ae 100644 --- a/yabgp/tests/unit/message/test_open.py +++ b/yabgp/tests/unit/message/test_open.py @@ -67,7 +67,7 @@ def test_construct_add_path(self): my_capa = { 'cisco_route_refresh': True, 'route_refresh': True, - 'add_path': 'ipv4_receive', + 'add_path': [{'afi_safi': 'ipv4', 'send/receive': 'receive'}], 'four_bytes_as': True, 'afi_safi': [(1, 1)], 'enhanced_route_refresh': True} From 142acb2ba9c42c76e50a8b5b88db5622d8538ae8 Mon Sep 17 00:00:00 2001 From: Changjie Gao Date: Tue, 3 Feb 2026 13:49:57 +0800 Subject: [PATCH 2/3] Modify as_path parsing method --- yabgp/message/attribute/aspath.py | 50 ++++++++---- .../unit/message/attribute/test_aspath.py | 80 +++++++++++++++++-- 2 files changed, 108 insertions(+), 22 deletions(-) diff --git a/yabgp/message/attribute/aspath.py b/yabgp/message/attribute/aspath.py index 555259d..396ac7b 100644 --- a/yabgp/message/attribute/aspath.py +++ b/yabgp/message/attribute/aspath.py @@ -64,27 +64,49 @@ def parse(cls, value, asn4=False): :param asn4: 4 bytes asn or not :param value: raw binary balue """ - # Loop over all path segments aspath = [] - while len(value) > 0: - seg_type, length = struct.unpack('!BB', value[:2]) - if seg_type not in [cls.AS_SET, cls.AS_SEQUENCE, cls.AS_CONFED_SEQUENCE, cls.AS_CONFED_SET]: + offset = 0 + total_len = len(value) + + # Determine ASN format: 4 bytes ('I') or 2 bytes ('H') + asn_byte_len = 4 if asn4 else 2 + asn_fmt_char = 'I' if asn4 else 'H' + + valid_types = { + cls.AS_SET, + cls.AS_SEQUENCE, + cls.AS_CONFED_SEQUENCE, + cls.AS_CONFED_SET + } + + while offset < total_len: + if offset + 2 > total_len: + raise excep.UpdateMessageError( + sub_error=bgp_cons.ERR_MSG_UPDATE_ATTR_LEN, + data='') + + seg_type, num_ases = struct.unpack_from('!BB', value, offset) + + if seg_type not in valid_types: raise excep.UpdateMessageError( sub_error=bgp_cons.ERR_MSG_UPDATE_MALFORMED_ASPATH, - data=repr(value)) - try: - if asn4: - segment = list(struct.unpack('!%dI' % length, value[2:2 + length * 4])) - value = value[2 + length * 4:] - - else: - segment = list(struct.unpack('!%dH' % length, value[2:2 + length * 2])) - value = value[2 + length * 2:] - except Exception: + data=repr(value[offset:])) + + offset += 2 + + segment_byte_len = num_ases * asn_byte_len + + if offset + segment_byte_len > total_len: raise excep.UpdateMessageError( sub_error=bgp_cons.ERR_MSG_UPDATE_ATTR_LEN, data='') + + fmt = '!%d%s' % (num_ases, asn_fmt_char) + segment = list(struct.unpack_from(fmt, value, offset)) + aspath.append((seg_type, segment)) + offset += segment_byte_len + return aspath @classmethod diff --git a/yabgp/tests/unit/message/attribute/test_aspath.py b/yabgp/tests/unit/message/attribute/test_aspath.py index 0294727..065a029 100644 --- a/yabgp/tests/unit/message/attribute/test_aspath.py +++ b/yabgp/tests/unit/message/attribute/test_aspath.py @@ -19,40 +19,104 @@ from yabgp.common.exception import UpdateMessageError from yabgp.common.constants import ERR_MSG_UPDATE_MALFORMED_ASPATH +from yabgp.common.constants import ERR_MSG_UPDATE_ATTR_LEN from yabgp.message.attribute.aspath import ASPath class TestASPATH(unittest.TestCase): - def test_parse(self): + def test_parse_empty(self): as_path = ASPath.parse(value=b'') self.assertEqual(as_path, []) + def test_parse_asn2(self): # 2bytes ASN - as_path = ASPath.parse(value=b'\x02\x04\x0c\xb9y3\x88 S\xd9') + # Segment Type: 2 (AS_SEQUENCE) + # Length: 4 ASNs + # ASNs: 3257, 31027, 34848, 21465 + data = b'\x02\x04\x0c\xb9y3\x88 S\xd9' + as_path = ASPath.parse(value=data) self.assertEqual(as_path, [(2, [3257, 31027, 34848, 21465])]) + def test_parse_asn4(self): # 4bytes ASN - as_path = ASPath.parse(value=b'\x02\x04\x00\x00\x0c\xb9\x00\x00y3\x00\x00\x88 \x00\x00S\xd9', - asn4=True) + # Segment Type: 2 (AS_SEQUENCE) + # Length: 4 ASNs + # ASNs: 3257, 31027, 34848, 21465 + data = b'\x02\x04\x00\x00\x0c\xb9\x00\x00y3\x00\x00\x88 \x00\x00S\xd9' + as_path = ASPath.parse(value=data, asn4=True) self.assertEqual(as_path, [(2, [3257, 31027, 34848, 21465])]) - # MALFORMED_ASPATH + def test_parse_mixed_asn2_asn4_mismatch(self): + # If we try to parse 4-byte ASN data as 2-byte ASN (default), it should fail or produce garbage. + # In this specific case, it hits an invalid segment type in the second "perceived" segment. + data = b'\x02\x04\x00\x00\x0c\xb9\x00\x00y3\x00\x00\x88 \x00\x00S\xd9' try: - ASPath.parse(value=b'\x02\x04\x00\x00\x0c\xb9\x00\x00y3\x00\x00\x88 \x00\x00S\xd9') + ASPath.parse(value=data, asn4=False) except UpdateMessageError as e: self.assertEqual(e.sub_error, ERR_MSG_UPDATE_MALFORMED_ASPATH) + + def test_parse_malformed_type(self): + # Invalid Segment Type 5 + data = b'\x05\x04\x0c\xb9y3\x88 S\xd9' try: - ASPath.parse(value=b'\x05\x04\x0c\xb9y3\x88 S\xd9') + ASPath.parse(value=data) except UpdateMessageError as e: self.assertEqual(e.sub_error, ERR_MSG_UPDATE_MALFORMED_ASPATH) + def test_parse_truncated_header(self): + # Only 1 byte provided, need 2 for header + data = b'\x02' + try: + ASPath.parse(value=data) + except UpdateMessageError as e: + self.assertEqual(e.sub_error, ERR_MSG_UPDATE_ATTR_LEN) + + def test_parse_truncated_body_asn2(self): + # Header says 1 ASN (2 bytes), but only 1 byte provided + # Type: 2, Count: 1 -> Need 2 bytes of body + data = b'\x02\x01\x00' + try: + ASPath.parse(value=data, asn4=False) + except UpdateMessageError as e: + self.assertEqual(e.sub_error, ERR_MSG_UPDATE_ATTR_LEN) + + def test_parse_truncated_body_asn4(self): + # Header says 1 ASN (4 bytes), but only 3 bytes provided + # Type: 2, Count: 1 -> Need 4 bytes of body + data = b'\x02\x01\x00\x00\x00' + try: + ASPath.parse(value=data, asn4=True) + except UpdateMessageError as e: + self.assertEqual(e.sub_error, ERR_MSG_UPDATE_ATTR_LEN) + def test_parse_as_set_as_federate(self): as_path = ASPath.parse(value=b'\x04\x02\x03\xe9\x03\xea\x03\x02\x03\xeb\x03\xec') self.assertEqual(as_path, [(4, [1001, 1002]), (3, [1003, 1004])]) - def test_construct(self): + def test_parse_all_segment_types(self): + # Cover all 4 types in one path + # 1. AS_SET (1), len=1, val=[100] + # 2. AS_SEQUENCE (2), len=1, val=[200] + # 3. AS_CONFED_SEQUENCE (3), len=1, val=[300] + # 4. AS_CONFED_SET (4), len=1, val=[400] + # 2-byte ASN + data = ( + b'\x01\x01\x00\x64' # Type 1, Len 1, AS 100 + b'\x02\x01\x00\xc8' # Type 2, Len 1, AS 200 + b'\x03\x01\x01\x2c' # Type 3, Len 1, AS 300 + b'\x04\x01\x01\x90' # Type 4, Len 1, AS 400 + ) + as_path = ASPath.parse(value=data, asn4=False) + expected = [ + (1, [100]), + (2, [200]), + (3, [300]), + (4, [400]) + ] + self.assertEqual(as_path, expected) + def test_construct(self): # 2 bytes ASN as_path = ASPath.construct(asn4=False, value=[(2, [3257, 31027, 34848, 21465])]) self.assertEqual(as_path, b'@\x02\n\x02\x04\x0c\xb9y3\x88 S\xd9') From 14a4ac56eab69f104e1de6d13cb6bea962d53b40 Mon Sep 17 00:00:00 2001 From: Changjie Gao Date: Wed, 4 Feb 2026 17:01:14 +0800 Subject: [PATCH 3/3] update unitest --- .../unit/message/attribute/test_aspath.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/yabgp/tests/unit/message/attribute/test_aspath.py b/yabgp/tests/unit/message/attribute/test_aspath.py index 065a029..c7996f6 100644 --- a/yabgp/tests/unit/message/attribute/test_aspath.py +++ b/yabgp/tests/unit/message/attribute/test_aspath.py @@ -133,6 +133,27 @@ def test_construct_as_set_as_federate(self): as_path = ASPath.construct(asn4=False, value=[(4, [1001, 1002]), (3, [1003, 1004])]) self.assertEqual(as_path, b'@\x02\x0c\x04\x02\x03\xe9\x03\xea\x03\x02\x03\xeb\x03\xec') + def test_parse_complex_asn4(self): + # 4bytes ASN with multiple segments and multiple ASNs per segment + # Segment 1: Type 2 (AS_SEQUENCE), Count 4, ASNs: [65536, 65537, 65538, 65539] + # Segment 2: Type 1 (AS_SET), Count 3, ASNs: [65540, 65541, 65542] + # Segment 3: Type 3 (AS_CONFED_SEQUENCE), Count 2, ASNs: [65543, 65544] + # Segment 4: Type 4 (AS_CONFED_SET), Count 2, ASNs: [65545, 65546] + data = ( + b'\x02\x04\x00\x01\x00\x00\x00\x01\x00\x01\x00\x01\x00\x02\x00\x01\x00\x03' + b'\x01\x03\x00\x01\x00\x04\x00\x01\x00\x05\x00\x01\x00\x06' + b'\x03\x02\x00\x01\x00\x07\x00\x01\x00\x08' + b'\x04\x02\x00\x01\x00\x09\x00\x01\x00\x0a' + ) + as_path = ASPath.parse(value=data, asn4=True) + expected = [ + (2, [65536, 65537, 65538, 65539]), + (1, [65540, 65541, 65542]), + (3, [65543, 65544]), + (4, [65545, 65546]) + ] + self.assertEqual(as_path, expected) + if __name__ == '__main__': unittest.main()