From db9dd034daf5aee4eead88178320071c66b86d0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20=C3=96qvist?= Date: Wed, 12 Nov 2025 09:45:54 +0100 Subject: [PATCH 1/3] Produce updates to kafka --- .gitignore | 1 + nipap/nipap.conf.dist | 38 +++ nipap/nipap/__init__.py | 2 +- nipap/nipap/db_schema.py | 105 ++++++- nipap/nipap/kafka_producer.py | 331 +++++++++++++++++++++ nipap/nipap/{xmlrpc.py => nipap_xmlrpc.py} | 0 nipap/nipap/nipapd.py | 45 ++- nipap/pyproject.toml | 1 + nipap/sql/functions.plsql | 11 + nipap/sql/ip_net.plsql | 17 +- nipap/sql/triggers.plsql | 19 ++ nipap/sql/upgrade-7-8.plsql | 53 ++++ 12 files changed, 615 insertions(+), 8 deletions(-) create mode 100644 nipap/nipap/kafka_producer.py rename nipap/nipap/{xmlrpc.py => nipap_xmlrpc.py} (100%) create mode 100644 nipap/sql/upgrade-7-8.plsql diff --git a/.gitignore b/.gitignore index 7d48fab57..80692e1fc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ docs/sphinx/_build/ nipap-www/nipap_www.egg-info nipap/nipap.egg-info +/.idea/ diff --git a/nipap/nipap.conf.dist b/nipap/nipap.conf.dist index 6cfce6686..9b7e062d8 100644 --- a/nipap/nipap.conf.dist +++ b/nipap/nipap.conf.dist @@ -212,3 +212,41 @@ secret_key = {{WWW_SECRET_KEY}} # otlp_http_endpoint=http://opentelemetry-collector:4318/v1/traces # Set sampler. Valid values are always_on, always_off, parentbased_always_on, parentbased_always_off, traceidratio and parentbased_traceidratio. Default is parentbased_always_on. # otel_traces_sampler = always_on + +# +# Kafka event producer configuration +# +[kafka] +# Enable running the external kafka producer process (true/false) +# If true, nipapd will spawn the kafka_producer as a separate process. +enabled = false + +# Comma-separated list of Kafka brokers, e.g. localhost:9092,broker2:9092 +#brokers = localhost:9092 + +# Poll interval in seconds for the kafka producer to poll the DB +#poll_interval = 2 + +# Topic prefix for produced events (defaults to "nipap.") +#topic_prefix = nipap. + +# Security protocol for Kafka connection (e.g., PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) +# Defaults to PLAINTEXT if not specified +#security_protocol = PLAINTEXT + +# SASL authentication settings (required when using SASL_PLAINTEXT or SASL_SSL) +# SASL mechanism to use (e.g., PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) +#sasl_mechanism = PLAIN + +# SASL username for authentication +#sasl_username = your_username + +# SASL password for authentication +#sasl_password = your_password + +# SSL/TLS settings (used with SSL or SASL_SSL security protocols) +# Path to CA certificate file for SSL/TLS certificate verification +#ssl_cafile = /path/to/ca-cert.pem + +# Enable/disable SSL hostname verification (true/false) +#ssl_check_hostname = true diff --git a/nipap/nipap/__init__.py b/nipap/nipap/__init__.py index 655f9fc4b..a5a34316e 100644 --- a/nipap/nipap/__init__.py +++ b/nipap/nipap/__init__.py @@ -1,5 +1,5 @@ __version__ = "0.32.7" -__db_version__ = 7 +__db_version__ = 8 __author__ = "Kristian Larsson, Lukas Garberg" __author_email__ = "kll@tele2.net, lukas@spritelink.net" __copyright__ = "Copyright 2011-2014, Kristian Larsson, Lukas Garberg" diff --git a/nipap/nipap/db_schema.py b/nipap/nipap/db_schema.py index 3298e347a..0a660ac27 100644 --- a/nipap/nipap/db_schema.py +++ b/nipap/nipap/db_schema.py @@ -529,7 +529,17 @@ RETURN (part_one::bigint << 32) + part_two::bigint; END; $_$ LANGUAGE plpgsql IMMUTABLE STRICT; -""" + +CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'DELETE' THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); + ELSIF OLD IS DISTINCT FROM NEW THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql;""" ip_net = """ -------------------------------------------- @@ -538,7 +548,7 @@ -- -------------------------------------------- -COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7'; +COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8'; CREATE EXTENSION IF NOT EXISTS ip4r; CREATE EXTENSION IF NOT EXISTS hstore; @@ -790,7 +800,21 @@ CREATE INDEX ip_net_log__prefix__index ON ip_net_log(prefix_id); CREATE INDEX ip_net_log__pool__index ON ip_net_log(pool_id); -""" +-- +-- Kafka event table and triggers +-- +-- This table is used as a queue for the external kafka_producer process. +-- Triggers on the core tables insert events here. The daemon will enable or +-- disable these triggers at startup depending on configuration. +-- +CREATE TABLE IF NOT EXISTS kafka_produce_event ( + id SERIAL PRIMARY KEY, + table_name TEXT NOT NULL, + event_type TEXT NOT NULL, + payload JSONB, + processed BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() +);""" triggers = """ -- @@ -1768,7 +1792,25 @@ WHEN (OLD.ipv4_default_prefix_length IS DISTINCT FROM NEW.ipv4_default_prefix_length OR OLD.ipv6_default_prefix_length IS DISTINCT FROM NEW.ipv6_default_prefix_length) EXECUTE PROCEDURE tf_ip_net_pool__iu_before(); -""" + +-- Triggers that write to kafka_produce_event +CREATE TRIGGER trigger_kafka_ip_net_plan + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_plan + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_vrf + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_vrf + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_pool + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_pool + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event();""" upgrade = [ """ @@ -2272,4 +2314,59 @@ -- update database schema version COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7'; """, +""" +-- +-- Upgrade from NIPAP database schema version 7 to 8 +-- + +-- +-- Kafka event table and triggers +-- +-- This table is used as a queue for the external kafka_producer process. +-- Triggers on the core tables insert events here. The daemon will enable or +-- disable these triggers at startup depending on configuration. +-- +CREATE TABLE IF NOT EXISTS kafka_produce_event ( + id SERIAL PRIMARY KEY, + table_name TEXT NOT NULL, + event_type TEXT NOT NULL, + payload JSONB, + processed BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() +); + +CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'DELETE' THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); + ELSIF OLD IS DISTINCT FROM NEW THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Triggers that write to kafka_produce_event +CREATE TRIGGER trigger_kafka_ip_net_plan + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_plan + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_vrf + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_vrf + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_pool + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_pool + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + + +-- update database schema version +COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8'; +""", ] diff --git a/nipap/nipap/kafka_producer.py b/nipap/nipap/kafka_producer.py new file mode 100644 index 000000000..fa04b7658 --- /dev/null +++ b/nipap/nipap/kafka_producer.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +# Simple Kafka producer process for NIPAP +# This process polls the kafka_produce_event table and forwards events to Kafka. +# It is intended to be forked by nipapd. + +import json +import logging +import time +from typing import Any + +try: + from kafka import KafkaProducer +except Exception: + KafkaProducer = None + +import psycopg2 +import psycopg2.extras + +from nipap.nipapconfig import NipapConfig + +LOG = logging.getLogger("nipap.kafka_producer") +LOG.addHandler(logging.NullHandler()) + + +def _build_db_args(cfg): + db_args = {} + db_args['host'] = cfg.get('nipapd', 'db_host') + db_args['database'] = cfg.get('nipapd', 'db_name') + db_args['user'] = cfg.get('nipapd', 'db_user') + db_args['password'] = cfg.get('nipapd', 'db_pass') + db_args['sslmode'] = cfg.get('nipapd', 'db_sslmode') + db_args['port'] = cfg.get('nipapd', 'db_port') + + if db_args['host'] is not None and db_args['host'] in ('', '""'): + db_args['host'] = None + for key in list(db_args.keys()): + if db_args[key] is None: + del db_args[key] + + return db_args + + +def _connect_db(cfg): + db_args = _build_db_args(cfg) + conn = None + while True: + try: + conn = psycopg2.connect(**db_args, cursor_factory=psycopg2.extras.RealDictCursor) + conn.autocommit = False + psycopg2.extras.register_hstore(conn, globally=True) + break + except Exception as e: + LOG.error("Unable to connect to DB for kafka_producer: %s. Retrying in 5s", e) + time.sleep(5) + return conn + + +def _create_kafka_producer(cfg): + if KafkaProducer is None: + LOG.error("kafka-python not installed, kafka producer disabled") + return None + + brokers = None + try: + if cfg.has_option('kafka', 'brokers'): + brokers = cfg.get('kafka', 'brokers') + except Exception: + brokers = None + + if not brokers: + LOG.error("No 'kafka.brokers' configured, kafka producer disabled") + return None + + # brokers could be comma separated list + brokers_list = [b.strip() for b in brokers.split(',') if b.strip()] + + # allow optional producer configs + producer_cfg: dict[str, Any] = {'bootstrap_servers': brokers_list, + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + 'key_serializer': lambda k: str(k).encode('utf-8')} + + # Add SASL_SSL security protocol support + try: + if cfg.has_option('kafka', 'security_protocol'): + security_protocol = cfg.get('kafka', 'security_protocol') + + # Currently only SASL_SSL is supported + if security_protocol != 'SASL_SSL': + LOG.error("Security protocol '%s' is not supported. Only SASL_SSL is currently supported.", security_protocol) + return None + + producer_cfg['security_protocol'] = security_protocol + + # If using SASL_SSL, require SASL credentials + if security_protocol == 'SASL_SSL': + # Get SASL mechanism (default to PLAIN) + sasl_mechanism = 'PLAIN' + if cfg.has_option('kafka', 'sasl_mechanism'): + sasl_mechanism = cfg.get('kafka', 'sasl_mechanism') + producer_cfg['sasl_mechanism'] = sasl_mechanism + + # Get SASL username and password + if cfg.has_option('kafka', 'sasl_username'): + sasl_username = cfg.get('kafka', 'sasl_username') + producer_cfg['sasl_plain_username'] = sasl_username + else: + LOG.warning("SASL_SSL configured but sasl_username not provided") + + if cfg.has_option('kafka', 'sasl_password'): + sasl_password = cfg.get('kafka', 'sasl_password') + producer_cfg['sasl_plain_password'] = sasl_password + else: + LOG.warning("SASL_SSL configured but sasl_password not provided") + + # Optional: SSL CA certificate verification + if cfg.has_option('kafka', 'ssl_cafile'): + ssl_cafile = cfg.get('kafka', 'ssl_cafile') + producer_cfg['ssl_cafile'] = ssl_cafile + + # Optional: SSL hostname verification + if cfg.has_option('kafka', 'ssl_check_hostname'): + try: + ssl_check_hostname = cfg.getboolean('kafka', 'ssl_check_hostname') + producer_cfg['ssl_check_hostname'] = ssl_check_hostname + except Exception: + LOG.warning("Invalid value for ssl_check_hostname, using default") + except Exception as e: + LOG.warning("Failed to configure security settings: %s", e) + + try: + # additional options could be added in config later + producer = KafkaProducer(**producer_cfg) + return producer + except Exception as e: + LOG.exception("Failed to create KafkaProducer: %s", e) + return None + + +def _ensure_producer(cfg, base_delay=1, max_delay=60): + """ + Ensure a KafkaProducer is available. If broker(s) are unavailable, keep retrying + with exponential backoff until a producer can be created. If kafka-python is not + installed, return None immediately. + """ + if KafkaProducer is None: + LOG.error("kafka-python not installed; cannot create KafkaProducer") + return None + + attempt = 0 + delay = base_delay + while True: + attempt += 1 + producer = _create_kafka_producer(cfg) + if producer is not None: + LOG.info("Successfully created KafkaProducer (attempt %d)", attempt) + return producer + LOG.error("Kafka producer not available, retrying in %ds (attempt %d)", min(delay, max_delay), attempt) + time.sleep(min(delay, max_delay)) + delay = min(delay * 2, max_delay) + + +def _send_with_backoff(producer, topic, value, key=None, max_retries=10, base_delay=1, max_delay=60): + """ + Attempt to send a message via KafkaProducer, retrying with exponential backoff on failure. + Returns True if the send was initiated successfully, False otherwise. + If producer is None, returns False immediately. + """ + if producer is None: + LOG.warning("Kafka producer is not available; cannot send message to %s", topic) + return False + + attempt = 0 + delay = base_delay + while True: + try: + # producer.send is async; it may still raise on client-side errors + producer.send(topic, value, key) + return True + except Exception as e: + attempt += 1 + LOG.warning("Kafka producer send failed for topic %s (attempt %d/%d): %s", topic, attempt, max_retries, e) + if attempt >= max_retries: + LOG.exception("Exceeded max retries (%d) for sending to topic %s. Giving up.", max_retries, topic) + return False + # sleep before next retry with growing delay + time.sleep(min(delay, max_delay)) + delay *= 2.0 + + +def run(config_path=None): + """ + Entry point for the kafka producer process. + :param config_path: path to configuration file (will be read via NipapConfig) + """ + # configure basic logging to stderr so daemonized process has something + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)-20s %(levelname)-8s %(message)s") + + try: + cfg = NipapConfig(config_path) + except Exception as e: + LOG.error("Unable to read config %s: %s", config_path, e) + return + + LOG.info("Starting kafka producer (config: %s)", config_path) + + # poll interval seconds + poll_interval = 2 + try: + if cfg.has_option('kafka', 'poll_interval'): + poll_interval = int(cfg.get('kafka', 'poll_interval')) + except Exception: + poll_interval = 2 + + # topic prefix default + topic_prefix = "nipap." + try: + if cfg.has_option('kafka', 'topic_prefix'): + topic_prefix = cfg.get('kafka', 'topic_prefix') + except Exception: + topic_prefix = "nipap." + + conn = _connect_db(cfg) + cur = conn.cursor() + + # ensure a producer is available before entering the main loop. This will retry + # indefinitely if brokers are unavailable, ensuring the process keeps trying to connect. + producer = _ensure_producer(cfg) + if producer is None: + # _ensure_producer returns None if kafka-python isn't installed or config invalid + LOG.error("Kafka producer not available (missing dependency or bad config), exiting kafka_producer process") + return + + LOG.info("Kafka producer connected, starting event loop") + + # main loop + while True: + try: + # begin transaction + cur.execute("BEGIN;") + + # fetch a batch of unlocked events + cur.execute(""" + SELECT id, table_name, event_type, payload + FROM kafka_produce_event + WHERE processed = FALSE + ORDER BY id + FOR UPDATE SKIP LOCKED + LIMIT 100 + """) + rows = cur.fetchall() + + if not rows: + # nothing to do + conn.rollback() + time.sleep(poll_interval) + continue + + # send events to kafka + ids = [] + send_failed = False + for row in rows: + try: + event_id = row['id'] + table = row['table_name'] + etype = row['event_type'] + payload = row['payload'] + + topic = f"{topic_prefix}{table}" + message = {'event_type': etype, 'payload': payload} + # send with retries and exponential backoff + sent = _send_with_backoff(producer, topic, message, payload.get('id')) + if sent: + ids.append(event_id) + else: + # sending failed (producer might be disconnected); mark to recreate producer + LOG.error("Failed to send event id %s to topic %s; will attempt to recreate producer", event_id, topic) + send_failed = True + break + except Exception as e: + # if a single event fails to prepare, log and skip it for now + LOG.exception("Failed to send event id %s: %s", row.get('id'), e) + + if send_failed: + # rollback processing of this batch, attempt to recreate producer and retry later + try: + conn.rollback() + except Exception: + pass + # try to recreate producer (this will block until a producer is available) + LOG.info("Attempting to recreate Kafka producer after send failures") + producer = _ensure_producer(cfg) + if producer is None: + LOG.error("Unable to recreate Kafka producer; exiting kafka_producer process") + return + # back off a bit before next attempt to avoid tight loop + time.sleep(max(1, poll_interval)) + continue + + # flush to ensure delivery or at least attempt + try: + producer.flush(timeout=10) + except Exception: + LOG.exception("Kafka producer flush failed") + # try to recreate producer on flush failure + try: + conn.rollback() + except Exception: + pass + LOG.info("Attempting to recreate Kafka producer after flush failure") + producer = _ensure_producer(cfg) + if producer is None: + LOG.error("Unable to recreate Kafka producer; exiting kafka_producer process") + return + time.sleep(max(1, poll_interval)) + continue + + # mark processed for ids that were attempted + if ids: + cur.execute("UPDATE kafka_produce_event SET processed = TRUE WHERE id = ANY(%s);", (ids,)) + conn.commit() + else: + conn.rollback() + + except Exception as e: + LOG.exception("Unexpected error in kafka_producer loop: %s", e) + try: + conn.rollback() + except Exception: + pass + # backoff on error + time.sleep(max(1, poll_interval)) diff --git a/nipap/nipap/xmlrpc.py b/nipap/nipap/nipap_xmlrpc.py similarity index 100% rename from nipap/nipap/xmlrpc.py rename to nipap/nipap/nipap_xmlrpc.py diff --git a/nipap/nipap/nipapd.py b/nipap/nipap/nipapd.py index bd2544e69..8915cc2a3 100644 --- a/nipap/nipap/nipapd.py +++ b/nipap/nipap/nipapd.py @@ -181,6 +181,47 @@ def run(): print("ERROR:", str(exc), file=sys.stderr) sys.exit(1) + # Kafka integration: enable/disable DB triggers and start producer process + try: + kafka_enabled = False + try: + if cfg.has_section('kafka'): + kafka_enabled = cfg.getboolean('kafka', 'enabled') + except Exception: + kafka_enabled = False + + # Toggle triggers produced by the SQL install depending on config. + # If the triggers do not exist yet (fresh DB), these ALTER TABLE commands + # will simply fail and we log the warning. + try: + if kafka_enabled: + nip._execute("ALTER TABLE ip_net_plan ENABLE TRIGGER trigger_kafka_ip_net_plan;") + nip._execute("ALTER TABLE ip_net_vrf ENABLE TRIGGER trigger_kafka_ip_net_vrf;") + nip._execute("ALTER TABLE ip_net_pool ENABLE TRIGGER trigger_kafka_ip_net_pool;") + logger.info("Kafka DB triggers enabled by configuration") + else: + nip._execute("ALTER TABLE ip_net_plan DISABLE TRIGGER trigger_kafka_ip_net_plan;") + nip._execute("ALTER TABLE ip_net_vrf DISABLE TRIGGER trigger_kafka_ip_net_vrf;") + nip._execute("ALTER TABLE ip_net_pool DISABLE TRIGGER trigger_kafka_ip_net_pool;") + logger.info("Kafka DB triggers disabled by configuration") + except Exception as e: + logger.warning("Could not toggle kafka triggers (may not be installed yet): %s", e) + + # Start the external kafka producer process if configured to do so. + if kafka_enabled: + import subprocess + python_exe = sys.executable or 'python3' + try: + # Run the producer in a separate process; pass the current config path. + cmd = [python_exe, '-c', "from nipap.kafka_producer import run; run(r'{}')".format(args.config_file)] + subprocess.Popen(cmd) + logger.info("Launched kafka_producer subprocess") + except Exception as e: + logger.error("Failed to start kafka_producer subprocess: %s", e) + except Exception: + # Do not abort startup if kafka integration fails for any reason. + logger.exception("Unexpected error while setting up kafka integration, continuing startup") + if args.dbversion: print("nipap db schema:", nip._get_db_version()) sys.exit(0) @@ -310,8 +351,8 @@ def run(): import nipap.rest rest = nipap.rest.setup(app) - import nipap.xmlrpc - nipapxml = nipap.xmlrpc.setup(app) + import nipap.nipap_xmlrpc + nipapxml = nipap.nipap_xmlrpc.setup(app) if not cfg.getboolean('nipapd', 'foreground'): # If we are not running in the foreground, remove current handlers which diff --git a/nipap/pyproject.toml b/nipap/pyproject.toml index 183dd0ffa..de689be43 100644 --- a/nipap/pyproject.toml +++ b/nipap/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "python-dateutil==2.8.2", "pytz==2024.2", "pyjwt==2.10.1", + "kafka-python==2.0.2", "tornado==6.5", # "docutils==0.21.2" ] diff --git a/nipap/sql/functions.plsql b/nipap/sql/functions.plsql index 7f6473b92..40252fce0 100644 --- a/nipap/sql/functions.plsql +++ b/nipap/sql/functions.plsql @@ -523,3 +523,14 @@ BEGIN RETURN (part_one::bigint << 32) + part_two::bigint; END; $_$ LANGUAGE plpgsql IMMUTABLE STRICT; + +CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'DELETE' THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); + ELSIF OLD IS DISTINCT FROM NEW THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/nipap/sql/ip_net.plsql b/nipap/sql/ip_net.plsql index 7ba7211d8..bca849e4f 100644 --- a/nipap/sql/ip_net.plsql +++ b/nipap/sql/ip_net.plsql @@ -4,7 +4,7 @@ -- -------------------------------------------- -COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7'; +COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8'; CREATE EXTENSION IF NOT EXISTS ip4r; CREATE EXTENSION IF NOT EXISTS hstore; @@ -256,3 +256,18 @@ CREATE INDEX ip_net_log__vrf__index ON ip_net_log(vrf_id); CREATE INDEX ip_net_log__prefix__index ON ip_net_log(prefix_id); CREATE INDEX ip_net_log__pool__index ON ip_net_log(pool_id); +-- +-- Kafka event table and triggers +-- +-- This table is used as a queue for the external kafka_producer process. +-- Triggers on the core tables insert events here. The daemon will enable or +-- disable these triggers at startup depending on configuration. +-- +CREATE TABLE IF NOT EXISTS kafka_produce_event ( + id SERIAL PRIMARY KEY, + table_name TEXT NOT NULL, + event_type TEXT NOT NULL, + payload JSONB, + processed BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() +); \ No newline at end of file diff --git a/nipap/sql/triggers.plsql b/nipap/sql/triggers.plsql index 0e7bf34d8..a03bbe9e7 100644 --- a/nipap/sql/triggers.plsql +++ b/nipap/sql/triggers.plsql @@ -973,3 +973,22 @@ CREATE TRIGGER trigger_ip_net_pool__u_before WHEN (OLD.ipv4_default_prefix_length IS DISTINCT FROM NEW.ipv4_default_prefix_length OR OLD.ipv6_default_prefix_length IS DISTINCT FROM NEW.ipv6_default_prefix_length) EXECUTE PROCEDURE tf_ip_net_pool__iu_before(); + +-- Triggers that write to kafka_produce_event +CREATE TRIGGER trigger_kafka_ip_net_plan + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_plan + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_vrf + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_vrf + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_pool + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_pool + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); \ No newline at end of file diff --git a/nipap/sql/upgrade-7-8.plsql b/nipap/sql/upgrade-7-8.plsql new file mode 100644 index 000000000..0b23951e9 --- /dev/null +++ b/nipap/sql/upgrade-7-8.plsql @@ -0,0 +1,53 @@ +-- +-- Upgrade from NIPAP database schema version 7 to 8 +-- + +-- +-- Kafka event table and triggers +-- +-- This table is used as a queue for the external kafka_producer process. +-- Triggers on the core tables insert events here. The daemon will enable or +-- disable these triggers at startup depending on configuration. +-- +CREATE TABLE IF NOT EXISTS kafka_produce_event ( + id SERIAL PRIMARY KEY, + table_name TEXT NOT NULL, + event_type TEXT NOT NULL, + payload JSONB, + processed BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() +); + +CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'DELETE' THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); + ELSIF OLD IS DISTINCT FROM NEW THEN + INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Triggers that write to kafka_produce_event +CREATE TRIGGER trigger_kafka_ip_net_plan + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_plan + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_vrf + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_vrf + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + +CREATE TRIGGER trigger_kafka_ip_net_pool + AFTER INSERT OR UPDATE OR DELETE + ON ip_net_pool + FOR EACH ROW + EXECUTE PROCEDURE tf_kafka_produce_event(); + + +-- update database schema version +COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8'; From 02ac455f6c5c40d997ceeddb38d518a426e9a8fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20=C3=96qvist?= Date: Mon, 17 Nov 2025 08:26:16 +0100 Subject: [PATCH 2/3] Prettify kafka topic names --- nipap/nipap/kafka_producer.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/nipap/nipap/kafka_producer.py b/nipap/nipap/kafka_producer.py index fa04b7658..1664835ef 100644 --- a/nipap/nipap/kafka_producer.py +++ b/nipap/nipap/kafka_producer.py @@ -186,6 +186,16 @@ def _send_with_backoff(producer, topic, value, key=None, max_retries=10, base_de time.sleep(min(delay, max_delay)) delay *= 2.0 +def _table_to_topic(topic_prefix, table): + if (table == "ip_net_plan"): + return topic_prefix + "prefix" + elif (table == "ip_net_vrf"): + return topic_prefix + "vrf" + elif (table == "ip_net_pool"): + return topic_prefix + "pool" + else: + raise ValueError("Unknown table for kafka topic mapping: %s" % table) + def run(config_path=None): """ @@ -265,7 +275,7 @@ def run(config_path=None): etype = row['event_type'] payload = row['payload'] - topic = f"{topic_prefix}{table}" + topic = _table_to_topic(topic_prefix, table) message = {'event_type': etype, 'payload': payload} # send with retries and exponential backoff sent = _send_with_backoff(producer, topic, message, payload.get('id')) From a15a6906d79bfeb7dbf03c82d5057015715a7d8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20=C3=96qvist?= Date: Mon, 17 Nov 2025 10:56:01 +0100 Subject: [PATCH 3/3] Use kafka-python-ng --- nipap/nipap/kafka_producer.py | 8 ++++---- nipap/pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nipap/nipap/kafka_producer.py b/nipap/nipap/kafka_producer.py index 1664835ef..46e5740f5 100644 --- a/nipap/nipap/kafka_producer.py +++ b/nipap/nipap/kafka_producer.py @@ -57,7 +57,7 @@ def _connect_db(cfg): def _create_kafka_producer(cfg): if KafkaProducer is None: - LOG.error("kafka-python not installed, kafka producer disabled") + LOG.error("kafka-python-ng not installed, kafka producer disabled") return None brokers = None @@ -139,11 +139,11 @@ def _create_kafka_producer(cfg): def _ensure_producer(cfg, base_delay=1, max_delay=60): """ Ensure a KafkaProducer is available. If broker(s) are unavailable, keep retrying - with exponential backoff until a producer can be created. If kafka-python is not + with exponential backoff until a producer can be created. If kafka-python-ng is not installed, return None immediately. """ if KafkaProducer is None: - LOG.error("kafka-python not installed; cannot create KafkaProducer") + LOG.error("kafka-python-ng not installed; cannot create KafkaProducer") return None attempt = 0 @@ -236,7 +236,7 @@ def run(config_path=None): # indefinitely if brokers are unavailable, ensuring the process keeps trying to connect. producer = _ensure_producer(cfg) if producer is None: - # _ensure_producer returns None if kafka-python isn't installed or config invalid + # _ensure_producer returns None if kafka-python-ng isn't installed or config invalid LOG.error("Kafka producer not available (missing dependency or bad config), exiting kafka_producer process") return diff --git a/nipap/pyproject.toml b/nipap/pyproject.toml index de689be43..52e2a06e6 100644 --- a/nipap/pyproject.toml +++ b/nipap/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "python-dateutil==2.8.2", "pytz==2024.2", "pyjwt==2.10.1", - "kafka-python==2.0.2", + "kafka-python-ng==2.2.3", "tornado==6.5", # "docutils==0.21.2" ]