diff --git a/example/mybmpd.py b/example/mybmpd.py index d766dcb..e7cbda6 100644 --- a/example/mybmpd.py +++ b/example/mybmpd.py @@ -82,31 +82,34 @@ def on_connection_lost(self, peer_host, peer_port): except Exception as e: LOG.info(e) - def on_message_received(self, peer_host, peer_port, msg, msg_type): + def on_message_received(self, peer_host, peer_port, msg, msg_type, data_type, length): """process for message received """ - if msg_type in [0, 1, 4, 5, 6]: - return - elif msg_type in [2, 3]: - self.puber.declare_queue(name='yabmp_%s' % peer_host) - self.puber.declare_exchange(_exchange='yabmp_%s' % peer_host, _type='direct') - self.puber.bind_queue(_exchange='yabmp_%s' % peer_host, _queue='yabmp_%s' % peer_host) - peer_ip = msg[0]['addr'] - msg_body = { - "type": msg_type, - "data": { - "time": time.time(), - "client_ip": peer_host, - "client_port": peer_port, - "bgp_peer_ip": peer_ip + if 'parse' in data_type: + if msg_type in [0, 1, 4, 5, 6]: + return + elif msg_type in [2, 3]: + self.puber.declare_queue(name='yabmp_%s' % peer_host) + self.puber.declare_exchange(_exchange='yabmp_%s' % peer_host, _type='direct') + self.puber.bind_queue(_exchange='yabmp_%s' % peer_host, _queue='yabmp_%s' % peer_host) + peer_ip = msg[0]['addr'] + msg_body = { + "type": msg_type, + "data": { + "time": time.time(), + "client_ip": peer_host, + "client_port": peer_port, + "bgp_peer_ip": peer_ip + } } - } - self.puber.publish_message( - _exchange='yabmp_%s' % peer_host, - _routing_key='yabmp_%s' % peer_host, - _body=msg_body) - else: - return + self.puber.publish_message( + _exchange='yabmp_%s' % peer_host, + _routing_key='yabmp_%s' % peer_host, + _body=msg_body) + else: + return + elif 'raw_data' in data_type: + pass def cli_opts_register(): diff --git a/yabmp/config.py b/yabmp/config.py index d6193a5..8e239f5 100644 --- a/yabmp/config.py +++ b/yabmp/config.py @@ -28,3 +28,7 @@ default=20000, help='Port the bind the BMP server to') ] + +DATA_OPTS = [ + cfg.ListOpt('type', default='parse', help='raw_data, parse'), +] diff --git a/yabmp/core/protocol.py b/yabmp/core/protocol.py index b1ae841..8e9a3d1 100644 --- a/yabmp/core/protocol.py +++ b/yabmp/core/protocol.py @@ -17,11 +17,14 @@ import struct import traceback from twisted.internet import protocol +from oslo_config import cfg from yabmp.common import constants as bmp_cons from yabmp.common import exception as excp from yabmp.message.bmp import BMPMessage +CONF = cfg.CONF + LOG = logging.getLogger() @@ -29,13 +32,13 @@ class BMP(protocol.Protocol): """ BGP Monitoring Protocol """ + cap_dict = {} def __init__(self): LOG.info('Building a new BGP protocol instance') self.receive_buffer = b'' - self.message = BMPMessage() - self.bgp_peer_dict = {} + self.message = BMPMessage(BMP.cap_dict) self.client_ip = None self.client_port = None @@ -113,25 +116,38 @@ def parse_buffer(self): # the hold message does not comming yet. return False msg_value = buf[6:length] + raw_msg = buf[:length] self.message.msg_type = msg_type LOG.debug('Received BMP message, type=%s' % msg_type) self.message.raw_body = msg_value LOG.debug('Decoding message...') try: - results = self.message.consume() - if results: - # write msg file + # results = self.message.consume(self.client_ip) + # if results: + # # write msg file + # self.factory.handler.on_message_received( + # self.client_ip, self.client_port, results, msg_type) + # else: + # LOG.error('decoding message failed.') + + if 'parse' in CONF.data.type: + results = self.message.consume(self.client_ip) + if results: + self.factory.handler.on_message_received( + self.client_ip, self.client_port, results, msg_type, CONF.data.type, length) + else: + LOG.error('decoding message failed.') + if 'raw_data' in CONF.data.type: self.factory.handler.on_message_received( - self.client_ip, self.client_port, results, msg_type) - else: - LOG.error('decoding message failed.') + self.client_ip, self.client_port, raw_msg, msg_type, CONF.data.type, length) except Exception as e: LOG.error(e) error_str = traceback.format_exc() LOG.debug(error_str) LOG.debug('Finished decoding.') - self.message = BMPMessage() + + self.message = BMPMessage(BMP.cap_dict) LOG.debug('-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+') self.receive_buffer = self.receive_buffer[length:] return True diff --git a/yabmp/handler/__init__.py b/yabmp/handler/__init__.py index 3c44ce7..c719110 100644 --- a/yabmp/handler/__init__.py +++ b/yabmp/handler/__init__.py @@ -48,7 +48,7 @@ def on_connection_lost(self, peer_host, peer_port): raise NotImplementedError() @abc.abstractmethod - def on_message_received(self, msg, msg_type): + def on_message_received(self, peer_host, peer_port, msg, msg_type, data_type, length): """process for message received """ raise NotImplementedError() diff --git a/yabmp/handler/default.py b/yabmp/handler/default.py index bf08266..72e4a53 100644 --- a/yabmp/handler/default.py +++ b/yabmp/handler/default.py @@ -14,6 +14,7 @@ # under the License. import os +import shutil import logging import sys import time @@ -55,6 +56,15 @@ def init(self): LOG.error(e, exc_info=True) sys.exit() + # mv history msg to back directory + for fpath, dirname, fnames in os.walk(CONF.message.write_dir): + if fnames: + if not os.path.exists(fpath.replace('/msg/', '/history_msg/')): + os.makedirs(fpath.replace('/msg/', '/history_msg/')) + old_file_path = "%s/%s" % (fpath, fnames[0]) + new_file_path = old_file_path.replace('/msg/', '/history_msg/') + shutil.move(old_file_path, new_file_path) + def on_connection_made(self, peer_host, peer_port): """process for connection made """ @@ -68,57 +78,70 @@ def on_connection_lost(self, peer_host, peer_port): """ pass - def on_message_received(self, peer_host, peer_port, msg, msg_type): + def on_message_received(self, peer_host, peer_port, msg, msg_type, data_type, length): """process for message received """ - if msg_type in [4, 5, 6]: - return - peer_ip = msg[0]['addr'] - if peer_ip not in self.bgp_peer_dict: - self.bgp_peer_dict[peer_ip] = {} - peer_msg_path = os.path.join( - os.path.join(CONF.message.write_dir, peer_host), peer_ip) - if not os.path.exists(peer_msg_path): - # this peer first come out - # create a peer path and open the first message file - os.makedirs(peer_msg_path) - LOG.info('Create directory for peer: %s' % peer_msg_path) - msg_file_name = os.path.join(peer_msg_path, '%s.msg' % time.time()) - self.bgp_peer_dict[peer_ip]['msg_seq'] = 1 - else: - # this peer is not first come out - # find the latest message file and get the last message sequence number - file_list = os.listdir(peer_msg_path) - file_list.sort() - msg_file_name = os.path.join(peer_msg_path, file_list[-1]) - self.bgp_peer_dict[peer_ip]['msg_seq'] = self.get_last_seq(msg_file_name) - self.bgp_peer_dict[peer_ip]['file'] = open(msg_file_name, 'a') - if msg_type == 0: # route monitoring message - if msg[0]['flags']['L']: - # pos-policy RIB - msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 130, msg[1], (1, 1)] - else: - # pre-policy RIB - msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], msg[1][0], msg[1][1], (1, 1)] - self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n') - self.bgp_peer_dict[peer_ip]['msg_seq'] += 1 - self.bgp_peer_dict[peer_ip]['file'].flush() - elif msg_type == 1: # statistic message - msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 129, msg[1], (0, 0)] - self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n') - self.bgp_peer_dict[peer_ip]['msg_seq'] += 1 - self.bgp_peer_dict[peer_ip]['file'].flush() - elif msg_type == 2: # peer down message - msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 3, msg[1], (0, 0)] - self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n') - self.bgp_peer_dict[peer_ip]['msg_seq'] += 1 - self.bgp_peer_dict[peer_ip]['file'].flush() - - elif msg_type == 3: # peer up message - msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 1, msg[1]['received_open_msg'], (0, 0)] - self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n') - self.bgp_peer_dict[peer_ip]['msg_seq'] += 1 - self.bgp_peer_dict[peer_ip]['file'].flush() + if 'parse' in data_type: + if msg_type in [4, 5, 6]: + return + peer_ip = msg[0]['addr'] + if peer_host not in self.bgp_peer_dict.keys(): + self.bgp_peer_dict[peer_host] = {} + if peer_ip not in self.bgp_peer_dict[peer_host].keys(): + self.bgp_peer_dict[peer_host][peer_ip] = {} + peer_msg_path = os.path.join( + os.path.join(CONF.message.write_dir, peer_host), peer_ip) + if not os.path.exists(peer_msg_path): + # this peer first come out + # create a peer path and open the first message file + os.makedirs(peer_msg_path) + LOG.info('Create directory for peer: %s' % peer_msg_path) + msg_file_name = os.path.join(peer_msg_path, '%s.msg' % time.time()) + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] = 1 + else: + # this peer is not first come out + # find the latest message file and get the last message sequence number + file_list = os.listdir(peer_msg_path) + if not file_list: + LOG.info('Create directory for peer: %s' % peer_msg_path) + msg_file_name = os.path.join(peer_msg_path, '%s.msg' % time.time()) + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] = 1 + else: + file_list.sort() + msg_file_name = os.path.join(peer_msg_path, file_list[-1]) + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] = self.get_last_seq(msg_file_name) + self.bgp_peer_dict[peer_host][peer_ip]['file'] = open(msg_file_name, 'a') + if msg_type == 0: # route monitoring message + if msg[0]['flags']['L']: + # pos-policy RIB + msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 130, msg[1], (1, 1)] + else: + # pre-policy RIB + msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], + msg[1][0], msg[1][1], (1, 1)] + self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n') + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1 + self.bgp_peer_dict[peer_host][peer_ip]['file'].flush() + elif msg_type == 1: # statistic message + msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 129, msg[1], (0, 0)] + self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n') + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1 + self.bgp_peer_dict[peer_host][peer_ip]['file'].flush() + elif msg_type == 2: # peer down message + msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 3, msg[1], (0, 0)] + self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n') + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1 + self.bgp_peer_dict[peer_host][peer_ip]['file'].flush() + + elif msg_type == 3: # peer up message + msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 1, + msg[1]['received_open_msg'], (0, 0)] + self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n') + self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1 + self.bgp_peer_dict[peer_host][peer_ip]['file'].flush() + + if 'raw_data' in data_type: + pass @staticmethod def get_last_seq(file_name): diff --git a/yabmp/message/bmp.py b/yabmp/message/bmp.py index 48f85c5..376cd41 100644 --- a/yabmp/message/bmp.py +++ b/yabmp/message/bmp.py @@ -40,12 +40,13 @@ class BMPMessage(object): definition of BMP message and methons used to decode message. """ - def __init__(self): + def __init__(self, cap_dict): self.version = None self.msg_type = None self.raw_body = None self.msg_body = None + self.peers_cap = cap_dict @staticmethod def parse_per_peer_header(raw_peer_header): @@ -123,7 +124,7 @@ def parse_per_peer_header(raw_peer_header): return per_header_dict @staticmethod - def parse_route_monitoring_msg(msg): + def parse_route_monitoring_msg(msg, asn4=False): """ Route Monitoring messages are used for initial synchronization of ADJ-RIBs-In. They are also used for ongoing monitoring of received @@ -139,7 +140,7 @@ def parse_route_monitoring_msg(msg): msg = msg[bgp_cons.HDR_LEN:] if bgp_msg_type == 2: # decode update message - results = Update().parse(None, msg, asn4=True) + results = Update().parse(None, msg, asn4=asn4) if results['sub_error']: LOG.error('error: decode update message error!, error code: %s' % results['sub_error']) LOG.error('Raw data: %s' % repr(results['hex'])) @@ -160,7 +161,7 @@ def parse_route_monitoring_msg(msg): 'safi': bgp_route_refresh_msg[2]} @staticmethod - def parse_route_mirroring_msg(msg): + def parse_route_mirroring_msg(msg, asn4=False): """ Route Mirroring messages are used for verbatim duplication of messages as received. Following the common BMP header and per-peer @@ -187,7 +188,7 @@ def parse_route_mirroring_msg(msg): bgp_msg_body = mirror_value[bgp_cons.HDR_LEN:] if bgp_msg_type == 2: # Update message - bgp_update_msg = Update().parse(None, bgp_msg_body, asn4=True) + bgp_update_msg = Update().parse(None, bgp_msg_body, asn4=asn4) if bgp_update_msg['sub_error']: LOG.error('error: decode update message error!, error code: %s' % bgp_update_msg['sub_error']) LOG.error('Raw data: %s' % repr(bgp_update_msg['hex'])) @@ -425,22 +426,32 @@ def parse_termination_msg(msg): LOG.info('termination message = %s' % msg_dict) return msg_dict - def consume(self): + def consume(self, client_ip): if self.msg_type in [0, 1, 2, 3, 6]: try: per_peer_header = self.parse_per_peer_header(self.raw_body[0:42]) self.msg_body = self.raw_body[42:] if self.msg_type == 0: - return per_peer_header, self.parse_route_monitoring_msg(self.msg_body) + peer_ip = per_peer_header.get('addr', None) + asn4 = self.peers_cap[client_ip][peer_ip].get("capabilities", {}).get('four_bytes_as', False) + return per_peer_header, self.parse_route_monitoring_msg(self.msg_body, asn4=asn4) elif self.msg_type == 1: return per_peer_header, self.parse_statistic_report_msg(self.msg_body) elif self.msg_type == 2: return per_peer_header, self.parse_peer_down_notification(self.msg_body) elif self.msg_type == 3: + peer_ip = per_peer_header.get('addr', None) + peer_up_msg = self.parse_peer_up_notification(self.msg_body, per_peer_header['flags']) + recirved_open = peer_up_msg.get("received_open_msg", {}) + if self.peers_cap.get(client_ip, None) is None: + self.peers_cap[client_ip] = {} + self.peers_cap[client_ip][peer_ip] = recirved_open return per_peer_header, self.parse_peer_up_notification(self.msg_body, per_peer_header['flags']) elif self.msg_type == 6: - return per_peer_header, self.parse_route_mirroring_msg(self.msg_body) + peer_ip = per_peer_header.get('addr', None) + asn4 = self.peers_cap[client_ip][peer_ip].get("capabilities", {}).get('four_bytes_as', False) + return per_peer_header, self.parse_route_mirroring_msg(self.msg_body, asn4=asn4) except Exception as e: LOG.error(e) error_str = traceback.format_exc() diff --git a/yabmp/service.py b/yabmp/service.py index f430013..151793c 100644 --- a/yabmp/service.py +++ b/yabmp/service.py @@ -29,6 +29,7 @@ CONF = cfg.CONF CONF.register_cli_opts(config.bmp_options) +CONF.register_cli_opts(config.DATA_OPTS, group='data') LOG = logging.getLogger(__name__)