Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions example/mybmpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,31 +82,34 @@ def on_connection_lost(self, peer_host, peer_port):
except Exception as e:
LOG.info(e)

def on_message_received(self, peer_host, peer_port, msg, msg_type):
def on_message_received(self, peer_host, peer_port, msg, msg_type, data_type, length):
"""process for message received
"""
if msg_type in [0, 1, 4, 5, 6]:
return
elif msg_type in [2, 3]:
self.puber.declare_queue(name='yabmp_%s' % peer_host)
self.puber.declare_exchange(_exchange='yabmp_%s' % peer_host, _type='direct')
self.puber.bind_queue(_exchange='yabmp_%s' % peer_host, _queue='yabmp_%s' % peer_host)
peer_ip = msg[0]['addr']
msg_body = {
"type": msg_type,
"data": {
"time": time.time(),
"client_ip": peer_host,
"client_port": peer_port,
"bgp_peer_ip": peer_ip
if 'parse' in data_type:
if msg_type in [0, 1, 4, 5, 6]:
return
elif msg_type in [2, 3]:
self.puber.declare_queue(name='yabmp_%s' % peer_host)
self.puber.declare_exchange(_exchange='yabmp_%s' % peer_host, _type='direct')
self.puber.bind_queue(_exchange='yabmp_%s' % peer_host, _queue='yabmp_%s' % peer_host)
peer_ip = msg[0]['addr']
msg_body = {
"type": msg_type,
"data": {
"time": time.time(),
"client_ip": peer_host,
"client_port": peer_port,
"bgp_peer_ip": peer_ip
}
}
}
self.puber.publish_message(
_exchange='yabmp_%s' % peer_host,
_routing_key='yabmp_%s' % peer_host,
_body=msg_body)
else:
return
self.puber.publish_message(
_exchange='yabmp_%s' % peer_host,
_routing_key='yabmp_%s' % peer_host,
_body=msg_body)
else:
return
elif 'raw_data' in data_type:
pass


def cli_opts_register():
Expand Down
4 changes: 4 additions & 0 deletions yabmp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@
default=20000,
help='Port the bind the BMP server to')
]

DATA_OPTS = [
cfg.ListOpt('type', default='parse', help='raw_data, parse'),
]
34 changes: 25 additions & 9 deletions yabmp/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,28 @@
import struct
import traceback
from twisted.internet import protocol
from oslo_config import cfg

from yabmp.common import constants as bmp_cons
from yabmp.common import exception as excp
from yabmp.message.bmp import BMPMessage

CONF = cfg.CONF

LOG = logging.getLogger()


class BMP(protocol.Protocol):
"""
BGP Monitoring Protocol
"""
cap_dict = {}

def __init__(self):

LOG.info('Building a new BGP protocol instance')
self.receive_buffer = b''
self.message = BMPMessage()
self.bgp_peer_dict = {}
self.message = BMPMessage(BMP.cap_dict)
self.client_ip = None
self.client_port = None

Expand Down Expand Up @@ -113,25 +116,38 @@ def parse_buffer(self):
# the hold message does not comming yet.
return False
msg_value = buf[6:length]
raw_msg = buf[:length]
self.message.msg_type = msg_type
LOG.debug('Received BMP message, type=%s' % msg_type)
self.message.raw_body = msg_value
LOG.debug('Decoding message...')
try:
results = self.message.consume()
if results:
# write msg file
# results = self.message.consume(self.client_ip)
# if results:
# # write msg file
# self.factory.handler.on_message_received(
# self.client_ip, self.client_port, results, msg_type)
# else:
# LOG.error('decoding message failed.')

if 'parse' in CONF.data.type:
results = self.message.consume(self.client_ip)
if results:
self.factory.handler.on_message_received(
self.client_ip, self.client_port, results, msg_type, CONF.data.type, length)
else:
LOG.error('decoding message failed.')
if 'raw_data' in CONF.data.type:
self.factory.handler.on_message_received(
self.client_ip, self.client_port, results, msg_type)
else:
LOG.error('decoding message failed.')
self.client_ip, self.client_port, raw_msg, msg_type, CONF.data.type, length)

except Exception as e:
LOG.error(e)
error_str = traceback.format_exc()
LOG.debug(error_str)
LOG.debug('Finished decoding.')
self.message = BMPMessage()

self.message = BMPMessage(BMP.cap_dict)
LOG.debug('-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+')
self.receive_buffer = self.receive_buffer[length:]
return True
2 changes: 1 addition & 1 deletion yabmp/handler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def on_connection_lost(self, peer_host, peer_port):
raise NotImplementedError()

@abc.abstractmethod
def on_message_received(self, msg, msg_type):
def on_message_received(self, peer_host, peer_port, msg, msg_type, data_type, length):
"""process for message received
"""
raise NotImplementedError()
121 changes: 72 additions & 49 deletions yabmp/handler/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# under the License.

import os
import shutil
import logging
import sys
import time
Expand Down Expand Up @@ -55,6 +56,15 @@ def init(self):
LOG.error(e, exc_info=True)
sys.exit()

# mv history msg to back directory
for fpath, dirname, fnames in os.walk(CONF.message.write_dir):
if fnames:
if not os.path.exists(fpath.replace('/msg/', '/history_msg/')):
os.makedirs(fpath.replace('/msg/', '/history_msg/'))
old_file_path = "%s/%s" % (fpath, fnames[0])
new_file_path = old_file_path.replace('/msg/', '/history_msg/')
shutil.move(old_file_path, new_file_path)

def on_connection_made(self, peer_host, peer_port):
"""process for connection made
"""
Expand All @@ -68,57 +78,70 @@ def on_connection_lost(self, peer_host, peer_port):
"""
pass

def on_message_received(self, peer_host, peer_port, msg, msg_type):
def on_message_received(self, peer_host, peer_port, msg, msg_type, data_type, length):
"""process for message received
"""
if msg_type in [4, 5, 6]:
return
peer_ip = msg[0]['addr']
if peer_ip not in self.bgp_peer_dict:
self.bgp_peer_dict[peer_ip] = {}
peer_msg_path = os.path.join(
os.path.join(CONF.message.write_dir, peer_host), peer_ip)
if not os.path.exists(peer_msg_path):
# this peer first come out
# create a peer path and open the first message file
os.makedirs(peer_msg_path)
LOG.info('Create directory for peer: %s' % peer_msg_path)
msg_file_name = os.path.join(peer_msg_path, '%s.msg' % time.time())
self.bgp_peer_dict[peer_ip]['msg_seq'] = 1
else:
# this peer is not first come out
# find the latest message file and get the last message sequence number
file_list = os.listdir(peer_msg_path)
file_list.sort()
msg_file_name = os.path.join(peer_msg_path, file_list[-1])
self.bgp_peer_dict[peer_ip]['msg_seq'] = self.get_last_seq(msg_file_name)
self.bgp_peer_dict[peer_ip]['file'] = open(msg_file_name, 'a')
if msg_type == 0: # route monitoring message
if msg[0]['flags']['L']:
# pos-policy RIB
msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 130, msg[1], (1, 1)]
else:
# pre-policy RIB
msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], msg[1][0], msg[1][1], (1, 1)]
self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_ip]['file'].flush()
elif msg_type == 1: # statistic message
msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 129, msg[1], (0, 0)]
self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_ip]['file'].flush()
elif msg_type == 2: # peer down message
msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 3, msg[1], (0, 0)]
self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_ip]['file'].flush()

elif msg_type == 3: # peer up message
msg_list = [time.time(), self.bgp_peer_dict[peer_ip]['msg_seq'], 1, msg[1]['received_open_msg'], (0, 0)]
self.bgp_peer_dict[peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_ip]['file'].flush()
if 'parse' in data_type:
if msg_type in [4, 5, 6]:
return
peer_ip = msg[0]['addr']
if peer_host not in self.bgp_peer_dict.keys():
self.bgp_peer_dict[peer_host] = {}
if peer_ip not in self.bgp_peer_dict[peer_host].keys():
self.bgp_peer_dict[peer_host][peer_ip] = {}
peer_msg_path = os.path.join(
os.path.join(CONF.message.write_dir, peer_host), peer_ip)
if not os.path.exists(peer_msg_path):
# this peer first come out
# create a peer path and open the first message file
os.makedirs(peer_msg_path)
LOG.info('Create directory for peer: %s' % peer_msg_path)
msg_file_name = os.path.join(peer_msg_path, '%s.msg' % time.time())
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] = 1
else:
# this peer is not first come out
# find the latest message file and get the last message sequence number
file_list = os.listdir(peer_msg_path)
if not file_list:
LOG.info('Create directory for peer: %s' % peer_msg_path)
msg_file_name = os.path.join(peer_msg_path, '%s.msg' % time.time())
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] = 1
else:
file_list.sort()
msg_file_name = os.path.join(peer_msg_path, file_list[-1])
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] = self.get_last_seq(msg_file_name)
self.bgp_peer_dict[peer_host][peer_ip]['file'] = open(msg_file_name, 'a')
if msg_type == 0: # route monitoring message
if msg[0]['flags']['L']:
# pos-policy RIB
msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 130, msg[1], (1, 1)]
else:
# pre-policy RIB
msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'],
msg[1][0], msg[1][1], (1, 1)]
self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_host][peer_ip]['file'].flush()
elif msg_type == 1: # statistic message
msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 129, msg[1], (0, 0)]
self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_host][peer_ip]['file'].flush()
elif msg_type == 2: # peer down message
msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 3, msg[1], (0, 0)]
self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_host][peer_ip]['file'].flush()

elif msg_type == 3: # peer up message
msg_list = [time.time(), self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'], 1,
msg[1]['received_open_msg'], (0, 0)]
self.bgp_peer_dict[peer_host][peer_ip]['file'].write(str(msg_list) + '\n')
self.bgp_peer_dict[peer_host][peer_ip]['msg_seq'] += 1
self.bgp_peer_dict[peer_host][peer_ip]['file'].flush()

if 'raw_data' in data_type:
pass

@staticmethod
def get_last_seq(file_name):
Expand Down
27 changes: 19 additions & 8 deletions yabmp/message/bmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ class BMPMessage(object):
definition of BMP message and methons used to decode message.
"""

def __init__(self):
def __init__(self, cap_dict):

self.version = None
self.msg_type = None
self.raw_body = None
self.msg_body = None
self.peers_cap = cap_dict

@staticmethod
def parse_per_peer_header(raw_peer_header):
Expand Down Expand Up @@ -123,7 +124,7 @@ def parse_per_peer_header(raw_peer_header):
return per_header_dict

@staticmethod
def parse_route_monitoring_msg(msg):
def parse_route_monitoring_msg(msg, asn4=False):
"""
Route Monitoring messages are used for initial synchronization of
ADJ-RIBs-In. They are also used for ongoing monitoring of received
Expand All @@ -139,7 +140,7 @@ def parse_route_monitoring_msg(msg):
msg = msg[bgp_cons.HDR_LEN:]
if bgp_msg_type == 2:
# decode update message
results = Update().parse(None, msg, asn4=True)
results = Update().parse(None, msg, asn4=asn4)
if results['sub_error']:
LOG.error('error: decode update message error!, error code: %s' % results['sub_error'])
LOG.error('Raw data: %s' % repr(results['hex']))
Expand All @@ -160,7 +161,7 @@ def parse_route_monitoring_msg(msg):
'safi': bgp_route_refresh_msg[2]}

@staticmethod
def parse_route_mirroring_msg(msg):
def parse_route_mirroring_msg(msg, asn4=False):
"""
Route Mirroring messages are used for verbatim duplication of
messages as received. Following the common BMP header and per-peer
Expand All @@ -187,7 +188,7 @@ def parse_route_mirroring_msg(msg):
bgp_msg_body = mirror_value[bgp_cons.HDR_LEN:]
if bgp_msg_type == 2:
# Update message
bgp_update_msg = Update().parse(None, bgp_msg_body, asn4=True)
bgp_update_msg = Update().parse(None, bgp_msg_body, asn4=asn4)
if bgp_update_msg['sub_error']:
LOG.error('error: decode update message error!, error code: %s' % bgp_update_msg['sub_error'])
LOG.error('Raw data: %s' % repr(bgp_update_msg['hex']))
Expand Down Expand Up @@ -425,22 +426,32 @@ def parse_termination_msg(msg):
LOG.info('termination message = %s' % msg_dict)
return msg_dict

def consume(self):
def consume(self, client_ip):

if self.msg_type in [0, 1, 2, 3, 6]:
try:
per_peer_header = self.parse_per_peer_header(self.raw_body[0:42])
self.msg_body = self.raw_body[42:]
if self.msg_type == 0:
return per_peer_header, self.parse_route_monitoring_msg(self.msg_body)
peer_ip = per_peer_header.get('addr', None)
asn4 = self.peers_cap[client_ip][peer_ip].get("capabilities", {}).get('four_bytes_as', False)
return per_peer_header, self.parse_route_monitoring_msg(self.msg_body, asn4=asn4)
elif self.msg_type == 1:
return per_peer_header, self.parse_statistic_report_msg(self.msg_body)
elif self.msg_type == 2:
return per_peer_header, self.parse_peer_down_notification(self.msg_body)
elif self.msg_type == 3:
peer_ip = per_peer_header.get('addr', None)
peer_up_msg = self.parse_peer_up_notification(self.msg_body, per_peer_header['flags'])
recirved_open = peer_up_msg.get("received_open_msg", {})
if self.peers_cap.get(client_ip, None) is None:
self.peers_cap[client_ip] = {}
self.peers_cap[client_ip][peer_ip] = recirved_open
return per_peer_header, self.parse_peer_up_notification(self.msg_body, per_peer_header['flags'])
elif self.msg_type == 6:
return per_peer_header, self.parse_route_mirroring_msg(self.msg_body)
peer_ip = per_peer_header.get('addr', None)
asn4 = self.peers_cap[client_ip][peer_ip].get("capabilities", {}).get('four_bytes_as', False)
return per_peer_header, self.parse_route_mirroring_msg(self.msg_body, asn4=asn4)
except Exception as e:
LOG.error(e)
error_str = traceback.format_exc()
Expand Down
1 change: 1 addition & 0 deletions yabmp/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CONF = cfg.CONF

CONF.register_cli_opts(config.bmp_options)
CONF.register_cli_opts(config.DATA_OPTS, group='data')

LOG = logging.getLogger(__name__)

Expand Down