diff --git a/examples/log_consumer.py b/examples/log_consumer.py index c62ddf6..3da3326 100644 --- a/examples/log_consumer.py +++ b/examples/log_consumer.py @@ -52,6 +52,11 @@ def process_message(msg): elif t == "openbmp.parsed.unicast_prefix": unicast_prefix = UnicastPrefix(m) + + # Optional fixture writer + # with open('unicast_prefixes_message', "w+") as file: + # file.write(msg.value + '\n') + print '\n' + 'Received Message (' + t_stamp + ') : ' + m_tag + '(V: ' + str(m.version) + ')' print unicast_prefix.to_json_pretty() diff --git a/src/openbmp/api/parsed/message/Base.py b/src/openbmp/api/parsed/message/Base.py index afa2934..cb63107 100644 --- a/src/openbmp/api/parsed/message/Base.py +++ b/src/openbmp/api/parsed/message/Base.py @@ -8,6 +8,7 @@ from abc import ABCMeta, abstractmethod import json +import re class Base(object): @@ -24,6 +25,24 @@ class Base(object): __metaclass__ = ABCMeta + @staticmethod + def isplit(string, delimiter=None): + """ + Like string.split but returns an iterator (lazy and works a little bit faster) + """ + if delimiter is None: + # Handle whitespace by default + delim = r"\s" + + elif len(delimiter) != 1: + raise ValueError("Can only handle single character delimiters", delimiter) + + else: + # Escape, incase it's "\", "*" etc. + delim = re.escape(delimiter) + + return (x.group(0) for x in re.finditer(r"[^{}]+".format(delim), string)) + def __init__(self): """Initializes the class variables.""" # Default message bus specification version (max) supported @@ -55,13 +74,18 @@ def get_row_map(self): """ return self.row_map - def parse(self, version, data): + def parse(self, version, data, validate=True, required_fields=None): """ Parse TSV rows of data from message :param version: Float representation of maximum message bus specification version supported. See http://openbmp.org/#!docs/MESSAGE_BUS_API.md for more details. :param data: TSV data (MUST not include the headers) + :param validate: If required to validate every field with its corresponding processor + :param required_fields: If needed to parse only feq fields ans speed up parsing. + Example: {10: 'prefix', 11: "prefix_len"} where: + "10" and "11" - positions of fields in MESSAGE_BUS_API, + "prefix" and "prefix_len" - name of parsed fields in resulting dictionary. :return: True if error, False if no errors """ @@ -76,20 +100,28 @@ def parse(self, version, data): if len(self.header_names) == 0: raise Exception("header_names should be overriden.") - records = data.splitlines() - - # # Splits each record into fields. - for r in records: - fields = r.split('\t') # Fields of a record as array. - - # # # Process and validate each field with its corresponding processor. - if len(fields) >= len(self.processors): - for i,processor in enumerate(self.processors): - fields[i] = processor.process_value(fields[i]) - - fields_dict = dict(zip(self.header_names, fields)) - - self.row_map.append(fields_dict) + # Splits each record into fields. + for record in Base.isplit(data, "\n"): + fields = record.split('\t') # Fields of a record as array. + + fields_map = {} + + if required_fields: + for key in required_fields: + if validate: + processor_class = self.get_processors()[key] + fields_map[required_fields[key]] = processor_class.process_value(fields[key]) + else: + fields_map[required_fields[key]] = fields[key] + else: + if len(fields) >= len(self.processors): + fields_map = dict(zip(self.header_names, fields)) + if validate: + # Process and validate each field with its corresponding processor. + for (f, p, h) in zip(fields, self.get_processors(), self.header_names): + fields_map[h] = p.process_value(f) + + self.row_map.append(fields_map) def to_json(self): """ diff --git a/src/openbmp/api/parsed/message/Message.py b/src/openbmp/api/parsed/message/Message.py index 9423a3f..74f369c 100644 --- a/src/openbmp/api/parsed/message/Message.py +++ b/src/openbmp/api/parsed/message/Message.py @@ -15,11 +15,12 @@ class Message(object): TYPE_PEER = "PEER" TYPE_ROUTER = "ROUTER" - def __init__(self, data=None): + def __init__(self, data=None, parse_headers=True): """ Handle the message by parsing header of it. :param data: Raw Kafka message as string. + :param parse_headers: If headers parsing is required. May be disabled to speed up. """ if data and not data.strip(): # If "data" is not string, throws error. @@ -35,49 +36,55 @@ def __init__(self, data=None): self.content_pos = int() self.router_ip = str() + self.__parse(data, parse_headers) + if data: - self.__parse(data) + self.__parse(data, parse_headers) - def __parse(self, data): + def __parse(self, data, parse_headers=True): """ Parses header of raw Kafka messages and set the version, length, number of records and router hash id. :param data: Raw Kafka message as string. + :param parse_headers: If headers parsing is required. May be disabled to speed up. """ - data_end_pos = data.find("\n\n") - header_data = data[:data_end_pos] + if parse_headers: + data_end_pos = data.find("\n\n") + header_data = data[:data_end_pos] - self.content_pos = data_end_pos + 2 - self.content = data[self.content_pos:] + self.content_pos = data_end_pos + 2 + self.content = data[self.content_pos:] - headers = header_data.split("\n") + headers = header_data.split("\n") - for header in headers: - value = header.split(":")[1].strip() - attr = header.split(":")[0].strip() + for header in headers: + value = header.split(":")[1].strip() + attr = header.split(":")[0].strip() - # Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers - if attr == "V": - self.version = float(value) + # Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers + if attr == "V": + self.version = float(value) - elif attr == "C_HASH_ID": - self.collector_hash_id = value + elif attr == "C_HASH_ID": + self.collector_hash_id = value - elif attr == "T": - self.type = value + elif attr == "T": + self.type = value - elif attr == "L": - self.length = long(value) + elif attr == "L": + self.length = long(value) - elif attr == "R": - self.records = long(value) + elif attr == "R": + self.records = long(value) - elif attr == "R_HASH_ID": - self.router_hash_id = value + elif attr == "R_HASH_ID": + self.router_hash_id = value - elif attr == "R_IP": - self.router_ip = value + elif attr == "R_IP": + self.router_ip = value + else: + self.content = data def get_version(self): return self.version diff --git a/src/openbmp/api/parsed/message/UnicastPrefix.py b/src/openbmp/api/parsed/message/UnicastPrefix.py index 9ae9530..908cd12 100644 --- a/src/openbmp/api/parsed/message/UnicastPrefix.py +++ b/src/openbmp/api/parsed/message/UnicastPrefix.py @@ -48,11 +48,17 @@ class UnicastPrefix(Base): MsgBusFields.ORIGINATOR_ID.get_name() ] - def __init__(self, message): + def __init__(self, message, validate=True, required_fields=None): """ Handle the message by parsing it and storing the data in memory. :param message: 'Message' object. + :param validate: If required to validate every field with its corresponding processor + :param validate: If required to validate every field with its corresponding processor + :param required_fields: If needed to parse only feq fields ans speed up parsing. + Example: {10: 'prefix', 11: "prefix_len"} where: + "10" and "11" - positions of fields in MESSAGE_BUS_API, + "prefix" and "prefix_len" - name of parsed fields in resulting dictionary. """ if not isinstance(message, Message): raise TypeError("Expected Message object instead of type " + type(message)) @@ -84,7 +90,7 @@ def __init__(self, message): self.processors = self.get_processors() if data: - self.parse(version, data) + self.parse(version, data, validate=validate, required_fields=required_fields) def get_processors(self): """ diff --git a/tests/FixturesBasedTestCase.py b/tests/FixturesBasedTestCase.py new file mode 100644 index 0000000..3a8f1a1 --- /dev/null +++ b/tests/FixturesBasedTestCase.py @@ -0,0 +1,8 @@ +import unittest + + +class FixturesBasedTestCase(unittest.TestCase): + + def setUp(self): + with open('fixtures/unicast_prefixes_message', 'r') as unicast_prefix_message_file: + self.unicast_prefix_message = unicast_prefix_message_file.read() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/unicast_prefixes_message b/tests/fixtures/unicast_prefixes_message new file mode 100644 index 0000000..581d90b --- /dev/null +++ b/tests/fixtures/unicast_prefixes_message @@ -0,0 +1,7 @@ +V: 1.5 +C_HASH_ID: 9ae8148974c9ca01ec9271753426d214 +L: 290976 +R: 2 + +add 465863 2783130a95f3ee3c238f38eb1b898b81 eee2f394c09f96c2453bf83989eaa1a2 172.20.0.1 d651b1d601e34826247374cb1430c91c 7aefefa6df92a1fccf2fd8a50d8900de 12.12.12.12 1403 2017-06-22 23:33:04.101567 12.12.12.0 20 1 igp 1403 999 777 3 9829 14.14.14.14 0 0 14032:1299 0 1 0 1 1 +add 465864 b1c8a35dcec465d155e6d31f190153fa eee2f394c09f96c2453bf83989eaa1a2 172.20.0.1 d651b1d601e34826247374cb1430c91c 7aefefa6df92a1fccf2fd8a50d8900de 12.12.123.123 1403 2017-06-22 23:33:04.101567 12.12.123.0 22 1 igp 1403 999 777 3 9829 14.14.14.144 0 0 14032:1299 0 1 0 1 1 \ No newline at end of file diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 0000000..6ea523e --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,28 @@ +from FixturesBasedTestCase import FixturesBasedTestCase + +from openbmp.api.parsed.message import Message + + +class MessageTest(FixturesBasedTestCase): + + def test_headers_parsing(self): + """ + Test default Message headers parsing + """ + message = Message(self.unicast_prefix_message) + + self.assertEqual(1.5, message.version) + self.assertEqual("9ae8148974c9ca01ec9271753426d214", message.collector_hash_id) + self.assertEqual(290976, message.length) + self.assertEqual(2, message.records) + + def test_disabled_headers_parsing(self): + """ + Test disabled Message headers parsing + """ + message = Message(self.unicast_prefix_message, parse_headers=False) + + self.assertEqual(0.0, message.version) + self.assertEqual("", message.collector_hash_id) + self.assertEqual(0, message.length) + self.assertEqual(0, message.records) diff --git a/tests/test_unicast_prefix.py b/tests/test_unicast_prefix.py new file mode 100644 index 0000000..f476336 --- /dev/null +++ b/tests/test_unicast_prefix.py @@ -0,0 +1,51 @@ +from FixturesBasedTestCase import FixturesBasedTestCase + +from openbmp.api.parsed.message import Message, UnicastPrefix + + +class UnicastPrefixTest(FixturesBasedTestCase): + + def test_default_parsing(self): + """ + Test default UnicastPrefix parsing + """ + message = Message(self.unicast_prefix_message) + unicast_prefixes = UnicastPrefix(message) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual(2, len(row_map)) + + self.assertEqual("12.12.12.0", row_map[0]['prefix']) + self.assertEqual(20, row_map[0]['prefix_len']) + + self.assertEqual("12.12.123.0", row_map[1]['prefix']) + self.assertEqual(22, row_map[1]['prefix_len']) + + def test_parsing_without_validation(self): + """ + If disable validation, it should still produce the same output but + numerical fields should be serializes as String + """ + message = Message(self.unicast_prefix_message) + unicast_prefixes = UnicastPrefix(message, validate=False) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual("20", row_map[0]['prefix_len']) + + def test_parsing_with_required_fields(self): + """ + Test how UnicastPrefix works with custom required_fields parameter + """ + message = Message(self.unicast_prefix_message) + + # Without validation + unicast_prefixes = UnicastPrefix(message, validate=False, required_fields={11: "my_custom_prefix_name"}) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual('20', row_map[0]['my_custom_prefix_name']) + + # With validation + unicast_prefixes = UnicastPrefix(message, validate=True, required_fields={11: "my_custom_prefix_name"}) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual(20, row_map[0]['my_custom_prefix_name'])