From f210e37a8c16d9a9f3c4b6348b2b454af257368f Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Mon, 28 Jul 2025 15:48:12 +0200 Subject: [PATCH 1/8] feat: add source EID support in recvmsg (mocked with node_id=10, service_id=10) Signed-off-by: Sylvain Pierrot --- bp_socket/af_bp.c | 17 ++++++++ bp_socket/af_bp.h | 6 +++ bp_socket/bp_genl.c | 81 ++++++++++++++++++++--------------- bp_socket/bp_genl.h | 10 +++-- daemon/bp_genl_handlers.c | 89 ++++++++++++++++++++++++++------------- daemon/bp_genl_handlers.h | 4 +- daemon/ion.c | 37 +++++++++------- daemon/ion.h | 16 ++++--- include/bp_socket.h | 7 +-- receiver.c | 15 +++++-- 10 files changed, 184 insertions(+), 98 deletions(-) diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 794fc7e..465ce65 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -289,6 +289,7 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) struct sock* sk; struct bp_sock* bp; struct sk_buff* skb; + struct sockaddr_bp* src_addr; int ret; sk = sock->sk; @@ -329,6 +330,22 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } + src_addr->bp_family = AF_BP; + src_addr->bp_scheme = BP_SCHEME_IPN; + src_addr->bp_addr.ipn.node_id = BP_SKB_CB(skb)->src_node_id; + src_addr->bp_addr.ipn.service_id = BP_SKB_CB(skb)->src_service_id; + + if (msg->msg_name && msg->msg_namelen >= sizeof(struct sockaddr_bp)) { + memcpy(msg->msg_name, &src_addr, sizeof(struct sockaddr_bp)); + msg->msg_namelen = sizeof(struct sockaddr_bp); // important + } else if (msg->msg_name) { + pr_warn( + "bp_recvmsg: user msg_name buffer too small (%u bytes)\n", + msg->msg_namelen); + ret = -EINVAL; + goto out; + } + if (copy_to_iter(skb->data, skb->len, &msg->msg_iter) != skb->len) { pr_err("bp_recvmsg: failed to copy data to user space\n"); ret = -EFAULT; diff --git a/bp_socket/af_bp.h b/bp_socket/af_bp.h index 842f080..de109da 100644 --- a/bp_socket/af_bp.h +++ b/bp_socket/af_bp.h @@ -4,7 +4,13 @@ #include #include +struct bp_skb_cb { + u_int32_t src_node_id; + u_int32_t src_service_id; +}; + #define bp_sk(ptr) container_of(ptr, struct bp_sock, sk) +#define BP_SKB_CB(skb) ((struct bp_skb_cb*)((skb)->cb)) extern struct hlist_head bp_list; extern rwlock_t bp_list_lock; diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 485a81b..93bc99b 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -5,9 +5,10 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { [BP_GENL_A_UNSPEC] = { .type = NLA_UNSPEC }, - [BP_GENL_A_SOCKID] = { .type = NLA_U64 }, - [BP_GENL_A_NODE_ID] = { .type = NLA_U32 }, - [BP_GENL_A_SERVICE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_SRC_NODE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_SRC_SERVICE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_DEST_NODE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 }, [BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY }, }; @@ -36,8 +37,8 @@ struct genl_family genl_fam = { .n_mcgrps = ARRAY_SIZE(genl_mcgrps), }; -int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, - u_int32_t service_id, int port_id) +int send_bundle_doit(void* payload, int payload_size, u_int32_t dest_node_id, + u_int32_t dest_service_id, int port_id) { void* msg_head; struct sk_buff* msg; @@ -62,21 +63,26 @@ int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); if (ret) { - pr_err("send_bundle: failed to put NODE_ID (%d)\n", ret); + pr_err( + "send_bundle: failed to put BP_GENL_A_DEST_NODE_ID (%d)\n", + ret); goto err_cancel; } - ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); if (ret) { - pr_err("send_bundle: failed to put SERVICE_ID (%d)\n", ret); + pr_err("send_bundle: failed to put BP_GENL_A_DEST_SERVICE_ID " + "(%d)\n", + ret); goto err_cancel; } ret = nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload); if (ret) { - pr_err("send_bundle: failed to put PAYLOAD (%d)\n", ret); + pr_err( + "send_bundle: failed to put BP_GENL_A_PAYLOAD (%d)\n", ret); goto err_cancel; } @@ -91,7 +97,8 @@ int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, return ret; } -int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +int request_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) { void* msg_head; struct sk_buff* msg; @@ -114,15 +121,19 @@ int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); if (ret) { - pr_err("request_bundle: failed to put NODE_ID (%d)\n", ret); + pr_err("request_bundle: failed to put BP_GENL_A_DEST_NODE_ID " + "(%d)\n", + ret); goto err_cancel; } - ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); if (ret) { - pr_err("request_bundle: failed to put SERVICE_ID (%d)\n", ret); + pr_err("request_bundle: failed to put " + "BP_GENL_A_DEST_SERVICE_ID (%d)\n", + ret); goto err_cancel; } @@ -143,20 +154,15 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) struct bp_sock* bp; struct sk_buff* new_skb; bool new_skb_queued = false; - u_int32_t node_id, service_id; + u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; void* payload; size_t payload_len; int ret; - if (!info->attrs[BP_GENL_A_NODE_ID] - || !info->attrs[BP_GENL_A_SERVICE_ID] - || !info->attrs[BP_GENL_A_PAYLOAD]) { - pr_err("deliver_bundle: missing required attributes\n"); - ret = -EINVAL; - goto out; - } - node_id = nla_get_u32(info->attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]); + dest_node_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_NODE_ID]); + dest_service_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_SERVICE_ID]); + src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); + src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]); payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]); @@ -167,6 +173,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) goto out; } skb_put_data(new_skb, payload, payload_len); + BP_SKB_CB(new_skb)->src_node_id = src_node_id; + BP_SKB_CB(new_skb)->src_service_id = src_service_id; read_lock_bh(&bp_list_lock); sk_for_each(sk, &bp_list) @@ -174,8 +182,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) bh_lock_sock(sk); bp = bp_sk(sk); - if (bp->bp_node_id == node_id - && bp->bp_service_id == service_id) { + if (bp->bp_node_id == dest_node_id + && bp->bp_service_id == dest_service_id) { skb_queue_tail(&bp->queue, new_skb); new_skb_queued = true; @@ -189,8 +197,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) read_unlock_bh(&bp_list_lock); if (!new_skb_queued) { - pr_err("deliver_bundle: no socket found for service ID %d\n", - service_id); + pr_err("deliver_bundle: no socket found (ion:%d.%d)\n", + dest_node_id, dest_service_id); ret = -ENODEV; goto err_free; } @@ -203,7 +211,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) return ret; } -int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +int destroy_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) { void* msg_head; struct sk_buff* msg; @@ -226,15 +235,19 @@ int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); if (ret) { - pr_err("destroy_bundle: failed to put NODE_ID (%d)\n", ret); + pr_err("destroy_bundle: failed to put BP_GENL_A_DEST_NODE_ID " + "(%d)\n", + ret); goto err_cancel; } - ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); if (ret) { - pr_err("destroy_bundle: failed to put SERVICE_ID (%d)\n", ret); + pr_err("destroy_bundle: failed to put " + "BP_GENL_A_DEST_SERVICE_ID (%d)\n", + ret); goto err_cancel; } diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index d3cfb1d..f37d622 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -5,10 +5,12 @@ extern struct genl_family genl_fam; -int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, - u_int32_t service_id, int port_id); +int send_bundle_doit(void* payload, int payload_size, u_int32_t dest_node_id, + u_int32_t dest_service_id, int port_id); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); -int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id); -int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id); +int request_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); +int destroy_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); #endif \ No newline at end of file diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index e5860c5..00a911a 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -20,8 +20,10 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { u_int32_t node_id, service_id; char eid[64]; int eid_size; + int err = 0; - if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || + !attrs[BP_GENL_A_DEST_SERVICE_ID]) { log_error( "handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, " "service ID)"); @@ -30,8 +32,8 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { payload = nla_data(attrs[BP_GENL_A_PAYLOAD]); payload_size = nla_len(attrs[BP_GENL_A_PAYLOAD]); - node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id) + 1; if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { @@ -40,10 +42,20 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { return -EINVAL; } - log_info("[ipn:%u.%u] SEND_BUNDLE: sending bundle to EID %s, size %d (bytes)", eid, - payload_size, node_id, service_id); + err = bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size); + if (err < 0) { + log_error("[ipn:%u.%u] handle_send_bundle: bp_send_to_eid failed with error %d", node_id, + service_id, err); + goto out; + } - return bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size); + log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %d (bytes)", node_id, service_id, + eid, payload_size); + + return 0; + +out: + return err; } int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { @@ -51,15 +63,15 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { struct thread_args *args; u_int32_t node_id, service_id; - if (!attrs[BP_GENL_A_SERVICE_ID] || !attrs[BP_GENL_A_NODE_ID]) { + if (!attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_DEST_NODE_ID]) { log_error("handle_request_bundle: missing attribute(s) in REQUEST_BUNDLE " "command (service " "ID, node ID)"); return -EINVAL; } - node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); args = malloc(sizeof(struct thread_args)); if (!args) { @@ -111,10 +123,11 @@ void *handle_recv_thread(struct thread_args *args) { args->node_id, args->service_id, payload_size); } - err = handle_deliver_bundle(payload, payload_size, args); + err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, payload, payload_size, + args->node_id, args->service_id, args->node_id, args->service_id); if (err < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", err, args->node_id, - args->service_id); + log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, + args->service_id, err); } out: @@ -123,60 +136,76 @@ void *handle_recv_thread(struct thread_args *args) { return NULL; } -int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *args) { +int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, + int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, + u_int32_t dest_node_id, u_int32_t dest_service_id) { struct nl_msg *msg = NULL; void *hdr; int ret; msg = nlmsg_alloc(); if (!msg) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to allocate Netlink msg", - args->node_id, args->service_id); + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to allocate Netlink msg", dest_node_id, + dest_service_id); ret = -ENOMEM; goto out; } - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0, + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); if (!hdr) { log_error("[ipn:%u.%u] handle_deliver_bundle: failed to create Netlink header", - args->node_id, args->service_id); + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add NODE_ID attribute", + dest_node_id, dest_service_id); ret = -EMSGSIZE; goto err_free_msg; } - if (nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0) { + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id) < 0) { log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SERVICE_ID attribute", - args->node_id, args->service_id); + dest_node_id, dest_service_id); ret = -EMSGSIZE; goto err_free_msg; } - if (nla_put_u32(msg, BP_GENL_A_NODE_ID, args->node_id) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add NODE_ID attribute", - args->node_id, args->service_id); + if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SRC_NODE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SRC_SERVICE_ID attribute", + src_node_id, src_service_id); ret = -EMSGSIZE; goto err_free_msg; } if (nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { - log_error("[ipn:%u.%u] [ipn:%u.%u] handle_deliver_bundle: failed to add PAYLOAD attribute", - args->node_id, args->service_id); + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add PAYLOAD attribute", + dest_node_id, dest_service_id); ret = -EMSGSIZE; goto err_free_msg; } - ret = nl_send_sync(args->netlink_sock, msg); + ret = nl_send_sync(netlink_sock, msg); if (ret < 0) { log_warn("[ipn:%u.%u] DELIVER_BUNDLE: bundle not delivered to kernel, keeping reference in " "memory (no active BP socket " "client)", - args->node_id, args->service_id); + dest_node_id, dest_service_id); ret = 0; // Do not return an error, just log it goto out; } - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", args->node_id, args->service_id); + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", dest_node_id, dest_service_id); return 0; @@ -190,7 +219,7 @@ int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { u_int32_t node_id, service_id; int ret = 0; - if (!attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { log_error("handle_destroy_bundle: missing attribute(s) in DESTROY_BUNDLE " "command (node ID, " "service ID)"); @@ -198,8 +227,8 @@ int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { goto out; } - node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); ret = destroy_adu(daemon->sdr, node_id, service_id); if (ret < 0) { diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index f959ea4..f4be443 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -14,7 +14,9 @@ struct thread_args { int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *args); +int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, + int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, + u_int32_t dest_node_id, u_int32_t dest_service_id); int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs); void *handle_recv_thread(struct thread_args *arg); diff --git a/daemon/ion.c b/daemon/ion.c index e04dfe8..9ac1b75 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -23,7 +23,8 @@ const char *bp_result_text(BpIndResult result) { } } -int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id) { +int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, + u_int32_t src_node_id, u_int32_t src_service_id) { struct adu_reference *ref; if (pthread_mutex_lock(&adu_refs_mutex) != 0) { @@ -39,8 +40,10 @@ int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id) { } ref->adu = adu; - ref->node_id = node_id; - ref->service_id = service_id; + ref->dest_node_id = dest_node_id; + ref->dest_service_id = dest_service_id; + ref->src_node_id = src_node_id; + ref->src_service_id = src_service_id; ref->next = adu_refs; adu_refs = ref; @@ -48,18 +51,18 @@ int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id) { return 0; } -Object find_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { +Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { struct adu_reference *ref; for (ref = adu_refs; ref != NULL; ref = ref->next) { - if (ref->node_id == node_id && ref->service_id == service_id) { + if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { return ref->adu; } } return 0; } -int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { +int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { struct adu_reference *prev = NULL; struct adu_reference *current = adu_refs; @@ -69,7 +72,7 @@ int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { } while (current) { - if (current->node_id == node_id && current->service_id == service_id) { + if (current->dest_node_id == dest_node_id && current->dest_service_id == dest_service_id) { if (prev) { prev->next = current->next; } else { @@ -94,7 +97,7 @@ int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { } pthread_mutex_unlock(&adu_refs_mutex); - log_warn("destroy_adu: no bundle found (ipn:%u.%u)", node_id, service_id); + log_warn("destroy_adu: no bundle found (ipn:%u.%u)", dest_node_id, dest_service_id); return -ENOENT; } @@ -136,7 +139,8 @@ int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int return ret; } -void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *payload_size) { +void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, + size_t *payload_size) { BpSAP sap; BpDelivery dlv; ZcoReader reader; @@ -147,12 +151,12 @@ void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *pay char eid[64]; own_node_id = getOwnNodeNbr(); - if (node_id != own_node_id) { - log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, node_id); + if (dest_node_id != own_node_id) { + log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, dest_node_id); return NULL; } - adu = find_adu(sdr, node_id, service_id); + adu = find_adu(sdr, dest_node_id, dest_service_id); if (adu != 0) { *payload_size = zco_source_data_length(sdr, adu); payload = malloc(*payload_size); @@ -169,15 +173,15 @@ void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *pay return payload; } - eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id); + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", dest_node_id, dest_service_id); if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { log_error("bp_recv_once: failed to construct EID string."); return NULL; } if (bp_open(eid, &sap) < 0) { - log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", node_id, - service_id); + log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", dest_node_id, + dest_service_id); return NULL; } @@ -191,7 +195,8 @@ void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *pay goto release_dlv; } - if (add_adu(sdr, dlv.adu, node_id, service_id) < 0) { + log_info("bp_recv_once: received bundle from %s", dlv.bundleSourceEid); + if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, 10, 10) < 0) { log_error("bp_recv_once: failed to add bundle reference."); goto release_dlv; } diff --git a/daemon/ion.h b/daemon/ion.h index 05310e4..ea0022d 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -5,16 +5,20 @@ struct adu_reference { Object adu; - u_int32_t node_id; - u_int32_t service_id; + u_int32_t dest_node_id; + u_int32_t dest_service_id; + u_int32_t src_node_id; + u_int32_t src_service_id; struct adu_reference *next; }; -int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id); -Object find_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id); -int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id); +int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, + u_int32_t src_node_id, u_int32_t src_service_id); +Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); +int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size); -void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *payload_size); +void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, + size_t *payload_size); #endif \ No newline at end of file diff --git a/include/bp_socket.h b/include/bp_socket.h index c42f6e4..15b11eb 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -17,9 +17,10 @@ /* Generic Netlink attributes */ enum bp_genl_attrs { BP_GENL_A_UNSPEC, - BP_GENL_A_SOCKID, - BP_GENL_A_NODE_ID, - BP_GENL_A_SERVICE_ID, + BP_GENL_A_SRC_NODE_ID, + BP_GENL_A_SRC_SERVICE_ID, + BP_GENL_A_DEST_NODE_ID, + BP_GENL_A_DEST_SERVICE_ID, BP_GENL_A_PAYLOAD, __BP_GENL_A_MAX, }; diff --git a/receiver.c b/receiver.c index 3a6ed42..eaec336 100644 --- a/receiver.c +++ b/receiver.c @@ -25,6 +25,7 @@ int main(int argc, char *argv[]) { char buffer[BUFFER_SIZE]; uint32_t node_id; uint32_t service_id; + struct sockaddr_bp src_addr; int ret = 0; if (argc < 3) { @@ -71,16 +72,22 @@ int main(int argc, char *argv[]) { memset(&msg, 0, sizeof(msg)); msg.msg_iov = &iov; msg.msg_iovlen = 1; + memset(&src_addr, 0, sizeof(src_addr)); + msg.msg_name = &src_addr; + msg.msg_namelen = sizeof(src_addr); printf("Listening for incoming messages...\n"); ssize_t n = recvmsg(sfd, &msg, 0); if (n < 0) { - perror("recvmsg failed"); - ret = EXIT_FAILURE; - goto out; + perror("recvmsg failed"); + ret = EXIT_FAILURE; + goto out; } - + printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); + printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, + src_addr.bp_addr.ipn.service_id); + out: close(sfd); From b21167d46dff4fb6bfddf209a26e37352dfb5fd3 Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Mon, 28 Jul 2025 22:11:09 +0200 Subject: [PATCH 2/8] fix: avoid freeing SDR buffer after success invocation of bp_send Signed-off-by: Sylvain Pierrot --- bp_socket/af_bp.c | 32 ++++++++++++++++++++------------ bp_socket/bp_genl.c | 17 +++++++++++++---- bp_socket/bp_genl.h | 2 +- daemon/bp_genl_handlers.c | 19 ++++++++----------- daemon/ion.c | 5 +++-- daemon/ion.h | 2 +- 6 files changed, 46 insertions(+), 31 deletions(-) diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 465ce65..143c0c8 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -170,21 +170,29 @@ int bp_release(struct socket* sock) struct sock* sk = sock->sk; struct bp_sock* bp; - if (sk) { - lock_sock(sk); - sock_orphan(sk); - bp = bp_sk(sk); - - write_lock_bh(&bp_list_lock); - sk_del_node_init(sk); - write_unlock_bh(&bp_list_lock); - skb_queue_purge(&bp->queue); + if (!sk) { + return 0; + } - sock->sk = NULL; - release_sock(sk); - sock_put(sk); + if (sock_owned_by_user(sk)) { + pr_warn("bp_release: socket is in use by another thread, " + "skipping cleanup to avoid deadlock\n"); + return 0; } + lock_sock(sk); + sock_orphan(sk); + bp = bp_sk(sk); + + write_lock_bh(&bp_list_lock); + sk_del_node_init(sk); + write_unlock_bh(&bp_list_lock); + skb_queue_purge(&bp->queue); + + sock->sk = NULL; + release_sock(sk); + sock_put(sk); + return 0; } diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 93bc99b..5c55017 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -37,7 +37,7 @@ struct genl_family genl_fam = { .n_mcgrps = ARRAY_SIZE(genl_mcgrps), }; -int send_bundle_doit(void* payload, int payload_size, u_int32_t dest_node_id, +int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) { void* msg_head; @@ -45,8 +45,7 @@ int send_bundle_doit(void* payload, int payload_size, u_int32_t dest_node_id, size_t msg_size; int ret; - msg_size = nla_total_size(sizeof(u64)) - + nla_total_size(sizeof(u_int32_t)) + msg_size = nla_total_size(sizeof(u_int32_t)) + nla_total_size(sizeof(u_int32_t)) + nla_total_size(payload_size); msg = genlmsg_new(msg_size, GFP_KERNEL); if (!msg) { @@ -159,6 +158,16 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) size_t payload_len; int ret; + if (!info->attrs[BP_GENL_A_DEST_NODE_ID] + || !info->attrs[BP_GENL_A_DEST_SERVICE_ID] + || !info->attrs[BP_GENL_A_SRC_NODE_ID] + || !info->attrs[BP_GENL_A_SRC_SERVICE_ID] + || !info->attrs[BP_GENL_A_PAYLOAD]) { + pr_err("deliver_bundle: missing required attributes\n"); + ret = -EINVAL; + goto out; + } + dest_node_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_NODE_ID]); dest_service_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_SERVICE_ID]); src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); @@ -197,7 +206,7 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) read_unlock_bh(&bp_list_lock); if (!new_skb_queued) { - pr_err("deliver_bundle: no socket found (ion:%d.%d)\n", + pr_err("deliver_bundle: no socket found (ipn:%d.%d)\n", dest_node_id, dest_service_id); ret = -ENODEV; goto err_free; diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index f37d622..48b223a 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -5,7 +5,7 @@ extern struct genl_family genl_fam; -int send_bundle_doit(void* payload, int payload_size, u_int32_t dest_node_id, +int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); int request_bundle_doit( diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index 00a911a..a9d4cd9 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -16,11 +16,11 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { void *payload; - int payload_size; + size_t payload_size; u_int32_t node_id, service_id; - char eid[64]; - int eid_size; + char dest_eid[64]; int err = 0; + int written; if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { @@ -35,27 +35,24 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id) + 1; - if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { + written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", node_id, service_id); + if (written < 0 || written >= (int)sizeof(dest_eid)) { log_error("[ipn:%u.%u] handle_send_bundle: failed to construct EID string", node_id, service_id); return -EINVAL; } - err = bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size); + err = bp_send_to_eid(daemon->sdr, payload, payload_size, dest_eid); if (err < 0) { log_error("[ipn:%u.%u] handle_send_bundle: bp_send_to_eid failed with error %d", node_id, service_id, err); - goto out; + return err; } log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %d (bytes)", node_id, service_id, - eid, payload_size); + dest_eid, payload_size); return 0; - -out: - return err; } int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { diff --git a/daemon/ion.c b/daemon/ion.c index 9ac1b75..a54f305 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -101,7 +101,7 @@ int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { return -ENOENT; } -int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size) { +int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid) { Object sdr_buffer = 0; Object adu; int ret = 0; @@ -123,18 +123,19 @@ int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, payload_size, ZcoOutbound); if (adu <= 0) { log_error("zco_create failed."); + sdr_free(sdr, sdr_buffer); ret = -ENOMEM; goto out; } if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, adu, NULL) <= 0) { log_error("bp_send failed."); + sdr_free(sdr, sdr_buffer); ret = -EIO; goto out; } out: - if (sdr_buffer != 0) sdr_free(sdr, sdr_buffer); sdr_end_xn(sdr); return ret; } diff --git a/daemon/ion.h b/daemon/ion.h index ea0022d..b4dfb27 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -17,7 +17,7 @@ int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_ Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); -int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size); +int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid); void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, size_t *payload_size); From ea98614811f19e30ef2c601fa80327054355ae0f Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Mon, 28 Jul 2025 23:05:48 +0200 Subject: [PATCH 3/8] feat: extract source EID from bundle and expose it via msg_name in recvmsg Signed-off-by: Sylvain Pierrot --- bp_socket/af_bp.c | 54 +++++++++++++++------------------------ daemon/bp_genl_handlers.c | 6 +++-- daemon/ion.c | 12 ++++++--- daemon/ion.h | 4 +-- receiver.c | 9 ++++--- 5 files changed, 40 insertions(+), 45 deletions(-) diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 143c0c8..4eb9bd2 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -170,28 +170,20 @@ int bp_release(struct socket* sock) struct sock* sk = sock->sk; struct bp_sock* bp; - if (!sk) { - return 0; - } - - if (sock_owned_by_user(sk)) { - pr_warn("bp_release: socket is in use by another thread, " - "skipping cleanup to avoid deadlock\n"); - return 0; - } - - lock_sock(sk); - sock_orphan(sk); - bp = bp_sk(sk); + if (sk) { + lock_sock(sk); + sock_orphan(sk); + bp = bp_sk(sk); - write_lock_bh(&bp_list_lock); - sk_del_node_init(sk); - write_unlock_bh(&bp_list_lock); - skb_queue_purge(&bp->queue); + write_lock_bh(&bp_list_lock); + sk_del_node_init(sk); + write_unlock_bh(&bp_list_lock); + skb_queue_purge(&bp->queue); - sock->sk = NULL; - release_sock(sk); - sock_put(sk); + sock->sk = NULL; + release_sock(sk); + sock_put(sk); + } return 0; } @@ -338,20 +330,14 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } - src_addr->bp_family = AF_BP; - src_addr->bp_scheme = BP_SCHEME_IPN; - src_addr->bp_addr.ipn.node_id = BP_SKB_CB(skb)->src_node_id; - src_addr->bp_addr.ipn.service_id = BP_SKB_CB(skb)->src_service_id; - - if (msg->msg_name && msg->msg_namelen >= sizeof(struct sockaddr_bp)) { - memcpy(msg->msg_name, &src_addr, sizeof(struct sockaddr_bp)); - msg->msg_namelen = sizeof(struct sockaddr_bp); // important - } else if (msg->msg_name) { - pr_warn( - "bp_recvmsg: user msg_name buffer too small (%u bytes)\n", - msg->msg_namelen); - ret = -EINVAL; - goto out; + if (msg->msg_name) { + src_addr = (struct sockaddr_bp*)msg->msg_name; + src_addr->bp_family = AF_BP; + src_addr->bp_scheme = BP_SCHEME_IPN; + src_addr->bp_addr.ipn.node_id = BP_SKB_CB(skb)->src_node_id; + src_addr->bp_addr.ipn.service_id + = BP_SKB_CB(skb)->src_service_id; + msg->msg_namelen = sizeof(struct sockaddr_bp); } if (copy_to_iter(skb->data, skb->len, &msg->msg_iter) != skb->len) { diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index a9d4cd9..f750111 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -97,6 +97,7 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { void *handle_recv_thread(struct thread_args *args) { void *payload = NULL; size_t payload_size; + u_int32_t src_node_id, src_service_id; int err; bool bundle_present; Object adu; @@ -104,7 +105,8 @@ void *handle_recv_thread(struct thread_args *args) { adu = find_adu(args->sdr, args->node_id, args->service_id); bundle_present = adu != 0; - payload = bp_recv_once(args->sdr, args->node_id, args->service_id, &payload_size); + payload = bp_recv_once(args->sdr, args->node_id, args->service_id, &payload_size, &src_node_id, + &src_service_id); if (!payload) { log_error("[ipn:%u.%u] handle_recv_thread: failed to receive bundle", args->node_id, args->service_id); @@ -121,7 +123,7 @@ void *handle_recv_thread(struct thread_args *args) { } err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, payload, payload_size, - args->node_id, args->service_id, args->node_id, args->service_id); + src_node_id, src_service_id, args->node_id, args->service_id); if (err < 0) { log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, args->service_id, err); diff --git a/daemon/ion.c b/daemon/ion.c index a54f305..299d6df 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -140,8 +140,8 @@ int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid) return ret; } -void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, - size_t *payload_size) { +void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, size_t *payload_size, + u_int32_t *src_node_id, u_int32_t *src_service_id) { BpSAP sap; BpDelivery dlv; ZcoReader reader; @@ -196,8 +196,12 @@ void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, goto release_dlv; } - log_info("bp_recv_once: received bundle from %s", dlv.bundleSourceEid); - if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, 10, 10) < 0) { + if (sscanf(dlv.bundleSourceEid, "ipn:%u.%u", src_node_id, src_service_id) != 2) { + log_error("bp_recv_once: failed to parse bundleSourceEid: %s", dlv.bundleSourceEid); + goto release_dlv; + } + + if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, *src_node_id, *src_service_id) < 0) { log_error("bp_recv_once: failed to add bundle reference."); goto release_dlv; } diff --git a/daemon/ion.h b/daemon/ion.h index b4dfb27..a32ad6a 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -18,7 +18,7 @@ Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid); -void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, - size_t *payload_size); +void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, size_t *payload_size, + u_int32_t *src_node_id, u_int32_t *src_service_id); #endif \ No newline at end of file diff --git a/receiver.c b/receiver.c index eaec336..737e974 100644 --- a/receiver.c +++ b/receiver.c @@ -85,9 +85,12 @@ int main(int argc, char *argv[]) { } printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); - printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, - src_addr.bp_addr.ipn.service_id); - + if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) { + printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, + src_addr.bp_addr.ipn.service_id); + } else { + printf("Source address not available\n"); + } out: close(sfd); From be53cdacc7c4fd38a30f0008f682988d34b5fc17 Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Mon, 28 Jul 2025 23:31:43 +0200 Subject: [PATCH 4/8] fix: get source node_id and service_id when adu is keeped in memory Signed-off-by: Sylvain Pierrot --- daemon/ion.c | 23 ++++++++++++++++++----- daemon/ion.h | 1 + 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/daemon/ion.c b/daemon/ion.c index 299d6df..101c007 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -62,6 +62,17 @@ Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { return 0; } +struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { + struct adu_reference *ref; + + for (ref = adu_refs; ref != NULL; ref = ref->next) { + if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { + return ref; + } + } + return NULL; +} + int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { struct adu_reference *prev = NULL; struct adu_reference *current = adu_refs; @@ -145,7 +156,7 @@ void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, s BpSAP sap; BpDelivery dlv; ZcoReader reader; - Object adu; + struct adu_reference *adu_ref; u_int32_t own_node_id; void *payload = NULL; int eid_size; @@ -157,20 +168,22 @@ void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, s return NULL; } - adu = find_adu(sdr, dest_node_id, dest_service_id); - if (adu != 0) { - *payload_size = zco_source_data_length(sdr, adu); + adu_ref = find_adu_ref(sdr, dest_node_id, dest_service_id); + if (adu_ref != NULL) { + *payload_size = zco_source_data_length(sdr, adu_ref->adu); payload = malloc(*payload_size); if (!payload) { log_error("bp_recv_once: Failed to allocate memory for payload."); return NULL; } - zco_start_receiving(adu, &reader); + zco_start_receiving(adu_ref->adu, &reader); if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { log_error("bp_recv_once: zco_receive_source failed."); free(payload); return NULL; } + *src_node_id = adu_ref->src_node_id; + *src_service_id = adu_ref->src_service_id; return payload; } diff --git a/daemon/ion.h b/daemon/ion.h index a32ad6a..c3a9bfb 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -14,6 +14,7 @@ struct adu_reference { int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id); +struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); From 5f52d3fa964ffc99852c8d541d9e2e3fb4e0d1af Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Tue, 29 Jul 2025 13:29:20 +0200 Subject: [PATCH 5/8] refactor: dedicated adu_ref.c and .h files Signed-off-by: Sylvain Pierrot --- daemon/adu_ref.c | 79 +++++++++++ daemon/adu_ref.h | 20 +++ daemon/bp_genl_handlers.c | 51 ++++---- daemon/ion.c | 268 ++++++++++++++++---------------------- daemon/ion.h | 20 +-- 5 files changed, 241 insertions(+), 197 deletions(-) create mode 100644 daemon/adu_ref.c create mode 100644 daemon/adu_ref.h diff --git a/daemon/adu_ref.c b/daemon/adu_ref.c new file mode 100644 index 0000000..902067d --- /dev/null +++ b/daemon/adu_ref.c @@ -0,0 +1,79 @@ +#include "adu_ref.h" +#include "log.h" +#include "sdr.h" +#include +#include + +static struct adu_reference *adu_refs = NULL; +static pthread_mutex_t adu_refs_mutex = PTHREAD_MUTEX_INITIALIZER; + +int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, + u_int32_t src_node_id, u_int32_t src_service_id) { + struct adu_reference *ref; + + if (pthread_mutex_lock(&adu_refs_mutex) != 0) { + log_error("add_adu: Failed to lock SDR mutex."); + return -1; + } + + ref = malloc(sizeof(struct adu_reference)); + if (!ref) { + log_error("add_adu: Failed to allocate memory for bundle reference."); + pthread_mutex_unlock(&adu_refs_mutex); + return -ENOMEM; + } + + ref->adu = adu; + ref->dest_node_id = dest_node_id; + ref->dest_service_id = dest_service_id; + ref->src_node_id = src_node_id; + ref->src_service_id = src_service_id; + ref->next = adu_refs; + adu_refs = ref; + + pthread_mutex_unlock(&adu_refs_mutex); + return 0; +} + +struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { + struct adu_reference *ref; + + for (ref = adu_refs; ref != NULL; ref = ref->next) { + if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { + return ref; + } + } + return NULL; +} + +Object remove_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { + struct adu_reference *prev = NULL; + struct adu_reference *current = adu_refs; + Object adu = 0; + + if (pthread_mutex_lock(&adu_refs_mutex) != 0) { + log_error("remove_adu_ref: Failed to lock adu_refs mutex."); + return -EAGAIN; + } + + while (current) { + if (current->dest_node_id == dest_node_id && current->dest_service_id == dest_service_id) { + adu = current->adu; + if (prev) { + prev->next = current->next; + } else { + adu_refs = current->next; + } + + free(current); + pthread_mutex_unlock(&adu_refs_mutex); + return adu; + } + prev = current; + current = current->next; + } + + pthread_mutex_unlock(&adu_refs_mutex); + log_warn("remove_adu_ref: no bundle found (ipn:%u.%u)", dest_node_id, dest_service_id); + return adu; +} \ No newline at end of file diff --git a/daemon/adu_ref.h b/daemon/adu_ref.h new file mode 100644 index 0000000..8fb42ab --- /dev/null +++ b/daemon/adu_ref.h @@ -0,0 +1,20 @@ +#ifndef ADU_REF_H +#define ADU_REF_H + +#include "bp.h" + +struct adu_reference { + Object adu; + u_int32_t dest_node_id; + u_int32_t dest_service_id; + u_int32_t src_node_id; + u_int32_t src_service_id; + struct adu_reference *next; +}; + +int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, + u_int32_t src_node_id, u_int32_t src_service_id); +struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); +Object remove_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); + +#endif \ No newline at end of file diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index f750111..7b22979 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -8,6 +8,7 @@ #include #include "../include/bp_socket.h" +#include "adu_ref.h" #include "bp.h" #include "bp_genl_handlers.h" #include "daemon.h" @@ -95,42 +96,29 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { } void *handle_recv_thread(struct thread_args *args) { - void *payload = NULL; - size_t payload_size; - u_int32_t src_node_id, src_service_id; int err; - bool bundle_present; - Object adu; + struct reply_bundle reply; - adu = find_adu(args->sdr, args->node_id, args->service_id); - bundle_present = adu != 0; - - payload = bp_recv_once(args->sdr, args->node_id, args->service_id, &payload_size, &src_node_id, - &src_service_id); - if (!payload) { - log_error("[ipn:%u.%u] handle_recv_thread: failed to receive bundle", args->node_id, - args->service_id); + reply = bp_recv_once(args->sdr, args->node_id, args->service_id); + if (!reply.is_present) { + log_warn("[ipn:%u.%u] REQUEST_BUNDLE: no bundle received", args->node_id, args->service_id); goto out; } - if (!bundle_present) { - log_info("[ipn:%u.%u] REQUEST_BUNDLE: bundle received, size %zu bytes", args->node_id, - args->service_id, payload_size); - } else { - log_warn("[ipn:%u.%u] REQUEST_BUNDLE: bundle reference already present in memory, size %zu " - "bytes", - args->node_id, args->service_id, payload_size); - } - - err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, payload, payload_size, - src_node_id, src_service_id, args->node_id, args->service_id); + err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, reply.payload, + reply.payload_size, reply.src_node_id, reply.src_service_id, + args->node_id, args->service_id); if (err < 0) { log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, args->service_id, err); + goto out; } + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", reply.src_node_id, + reply.src_service_id); + out: - if (payload) free(payload); + if (reply.payload) free(reply.payload); free(args); return NULL; } @@ -204,8 +192,6 @@ int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void goto out; } - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", dest_node_id, dest_service_id); - return 0; err_free_msg: @@ -216,6 +202,7 @@ int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { u_int32_t node_id, service_id; + Object adu; int ret = 0; if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { @@ -229,12 +216,18 @@ int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - ret = destroy_adu(daemon->sdr, node_id, service_id); - if (ret < 0) { + adu = remove_adu_ref(daemon->sdr, node_id, service_id); + if (adu == 0) { log_error("[ipn:%u.%u] handle_destroy_bundle: failed to destroy bundle: %s", node_id, service_id, strerror(-ret)); goto out; } + ret = destroy_bundle(daemon->sdr, adu); + if (ret < 0) { + log_error("[ipn:%u.%u] handle_destroy_bundle: destroy_bundle failed with error %d", node_id, + service_id, ret); + goto out; + } log_info("[ipn:%u.%u] DESTROY_BUNDLE: bundle destroy from ION", node_id, service_id); diff --git a/daemon/ion.c b/daemon/ion.c index 101c007..87bf013 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,11 +1,10 @@ #include "ion.h" +#include "adu_ref.h" #include "log.h" #include "sdr.h" #include #include -static struct adu_reference *adu_refs = NULL; -static pthread_mutex_t adu_refs_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; const char *bp_result_text(BpIndResult result) { @@ -23,95 +22,25 @@ const char *bp_result_text(BpIndResult result) { } } -int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, - u_int32_t src_node_id, u_int32_t src_service_id) { - struct adu_reference *ref; - - if (pthread_mutex_lock(&adu_refs_mutex) != 0) { - log_error("add_adu: Failed to lock SDR mutex."); - return -1; +int destroy_bundle(Sdr sdr, Object adu) { + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("destroy_bundle: Failed to lock SDR mutex."); + return -EAGAIN; } - ref = malloc(sizeof(struct adu_reference)); - if (!ref) { - log_error("add_adu: Failed to allocate memory for bundle reference."); - pthread_mutex_unlock(&adu_refs_mutex); - return -ENOMEM; + if (sdr_begin_xn(sdr) == 0) { + log_error("destroy_bundle: sdr_begin_xn failed."); + pthread_mutex_unlock(&sdrmutex); + return -EIO; } - ref->adu = adu; - ref->dest_node_id = dest_node_id; - ref->dest_service_id = dest_service_id; - ref->src_node_id = src_node_id; - ref->src_service_id = src_service_id; - ref->next = adu_refs; - adu_refs = ref; - - pthread_mutex_unlock(&adu_refs_mutex); - return 0; -} - -Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { - struct adu_reference *ref; + zco_destroy(sdr, adu); - for (ref = adu_refs; ref != NULL; ref = ref->next) { - if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { - return ref->adu; - } - } + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); return 0; } -struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { - struct adu_reference *ref; - - for (ref = adu_refs; ref != NULL; ref = ref->next) { - if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { - return ref; - } - } - return NULL; -} - -int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { - struct adu_reference *prev = NULL; - struct adu_reference *current = adu_refs; - - if (pthread_mutex_lock(&adu_refs_mutex) != 0) { - log_error("destroy_adu: Failed to lock adu_refs mutex."); - return -EAGAIN; - } - - while (current) { - if (current->dest_node_id == dest_node_id && current->dest_service_id == dest_service_id) { - if (prev) { - prev->next = current->next; - } else { - adu_refs = current->next; - } - - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("destroy_adu: Failed to lock SDR mutex."); - free(current); - pthread_mutex_unlock(&adu_refs_mutex); - return -EAGAIN; - } - - zco_destroy(sdr, current->adu); - free(current); - pthread_mutex_unlock(&sdrmutex); - pthread_mutex_unlock(&adu_refs_mutex); - return 0; - } - prev = current; - current = current->next; - } - - pthread_mutex_unlock(&adu_refs_mutex); - log_warn("destroy_adu: no bundle found (ipn:%u.%u)", dest_node_id, dest_service_id); - return -ENOENT; -} - int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid) { Object sdr_buffer = 0; Object adu; @@ -151,114 +80,143 @@ int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid) return ret; } -void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, size_t *payload_size, - u_int32_t *src_node_id, u_int32_t *src_service_id) { +struct reply_bundle bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { BpSAP sap; BpDelivery dlv; ZcoReader reader; struct adu_reference *adu_ref; u_int32_t own_node_id; void *payload = NULL; + size_t payload_size; int eid_size; char eid[64]; + struct reply_bundle reply = {0}; + u_int32_t src_node_id, src_service_id; + + reply.is_present = false; + reply.payload = NULL; own_node_id = getOwnNodeNbr(); if (dest_node_id != own_node_id) { log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, dest_node_id); - return NULL; + goto out; } adu_ref = find_adu_ref(sdr, dest_node_id, dest_service_id); if (adu_ref != NULL) { - *payload_size = zco_source_data_length(sdr, adu_ref->adu); - payload = malloc(*payload_size); + payload_size = zco_source_data_length(sdr, adu_ref->adu); + payload = malloc(payload_size); if (!payload) { log_error("bp_recv_once: Failed to allocate memory for payload."); - return NULL; + goto out; } zco_start_receiving(adu_ref->adu, &reader); - if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { + if (zco_receive_source(sdr, &reader, payload_size, payload) < 0) { log_error("bp_recv_once: zco_receive_source failed."); - free(payload); - return NULL; + goto out; } - *src_node_id = adu_ref->src_node_id; - *src_service_id = adu_ref->src_service_id; - return payload; - } - eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", dest_node_id, dest_service_id); - if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { - log_error("bp_recv_once: failed to construct EID string."); - return NULL; - } + src_node_id = adu_ref->src_node_id; + src_service_id = adu_ref->src_service_id; + } else { + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", dest_node_id, dest_service_id); + if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { + log_error("bp_recv_once: failed to construct EID string."); + goto out; + } - if (bp_open(eid, &sap) < 0) { - log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", dest_node_id, - dest_service_id); - return NULL; - } + if (bp_open(eid, &sap) < 0) { + log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", dest_node_id, + dest_service_id); + goto out; + } - if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { - log_error("bp_recv_once: bundle reception failed."); - goto close_sap; - } + if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { + log_error("bp_recv_once: bundle reception failed."); + bp_close(sap); + goto out; + } - if (dlv.result != BpPayloadPresent || dlv.adu == 0) { - log_error("bp_recv_once: %s", bp_result_text(dlv.result)); - goto release_dlv; - } + if (dlv.result != BpPayloadPresent || dlv.adu == 0) { + log_error("bp_recv_once: %s", bp_result_text(dlv.result)); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (sscanf(dlv.bundleSourceEid, "ipn:%u.%u", src_node_id, src_service_id) != 2) { - log_error("bp_recv_once: failed to parse bundleSourceEid: %s", dlv.bundleSourceEid); - goto release_dlv; - } + if (sscanf(dlv.bundleSourceEid, "ipn:%u.%u", &src_node_id, &src_service_id) != 2) { + log_error("bp_recv_once: failed to parse bundleSourceEid: %s", dlv.bundleSourceEid); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, *src_node_id, *src_service_id) < 0) { - log_error("bp_recv_once: failed to add bundle reference."); - goto release_dlv; - } + if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, src_node_id, src_service_id) < 0) { + log_error("bp_recv_once: failed to add bundle reference."); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("bp_recv_once: Failed to lock SDR mutex."); - goto release_dlv; - } + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("bp_recv_once: Failed to lock SDR mutex."); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (sdr_begin_xn(sdr) == 0) { - log_error("bp_recv_once: sdr_begin_xn failed."); - pthread_mutex_unlock(&sdrmutex); - goto release_dlv; - } + if (sdr_begin_xn(sdr) == 0) { + log_error("bp_recv_once: sdr_begin_xn failed."); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - *payload_size = zco_source_data_length(sdr, dlv.adu); - payload = malloc(*payload_size); - if (!payload) { - log_error("bp_recv_once: Failed to allocate memory for payload."); - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - goto release_dlv; - } + payload_size = zco_source_data_length(sdr, dlv.adu); + payload = malloc(payload_size); + if (!payload) { + log_error("bp_recv_once: Failed to allocate memory for payload."); + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } + + zco_start_receiving(dlv.adu, &reader); + if (zco_receive_source(sdr, &reader, payload_size, payload) < 0) { + log_error("bp_recv_once: zco_receive_source failed."); + free(payload); + payload = NULL; + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - zco_start_receiving(dlv.adu, &reader); - if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { - log_error("bp_recv_once: zco_receive_source failed."); - free(payload); - payload = NULL; sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); - goto release_dlv; + bp_release_delivery(&dlv, 0); + bp_close(sap); } - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - bp_release_delivery(&dlv, 0); - bp_close(sap); + if (payload == NULL) { + log_info("bp_recv_once: no payload received for node_id=%u service_id=%u", dest_node_id, + dest_service_id); + goto out; + } - return payload; + reply.is_present = true; + reply.payload = payload; + reply.payload_size = payload_size; + reply.src_node_id = src_node_id; + reply.src_service_id = src_service_id; -release_dlv: - bp_release_delivery(&dlv, 0); -close_sap: - bp_close(sap); - return NULL; +out: + if (payload && !reply.is_present) { + free(payload); + } + return reply; } diff --git a/daemon/ion.h b/daemon/ion.h index c3a9bfb..f3ee93d 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -2,24 +2,18 @@ #define ION_H #include "bp.h" +#include -struct adu_reference { - Object adu; - u_int32_t dest_node_id; - u_int32_t dest_service_id; +struct reply_bundle { + bool is_present; + void *payload; + size_t payload_size; u_int32_t src_node_id; u_int32_t src_service_id; - struct adu_reference *next; }; -int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, - u_int32_t src_node_id, u_int32_t src_service_id); -struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); -Object find_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); -int destroy_adu(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); - +int destroy_bundle(Sdr sdr, Object adu); int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid); -void *bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id, size_t *payload_size, - u_int32_t *src_node_id, u_int32_t *src_service_id); +struct reply_bundle bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); #endif \ No newline at end of file From 88513291359a2c21343d99888c431cb6bdb8b88f Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Tue, 29 Jul 2025 16:22:35 +0200 Subject: [PATCH 6/8] feat: implement CANCEL_BUNDLE_REQUEST to cancel blocking recvmsg Signed-off-by: Sylvain Pierrot --- bp_socket/af_bp.c | 16 ++++--- bp_socket/af_bp.h | 5 ++- bp_socket/bp_genl.c | 64 ++++++++++++++++++++++++---- bp_socket/bp_genl.h | 1 + daemon/bp_genl_handlers.c | 88 +++++++++++++++++++++++++++++++++------ daemon/bp_genl_handlers.h | 2 + include/bp_socket.h | 1 + 7 files changed, 148 insertions(+), 29 deletions(-) diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 4eb9bd2..018ba89 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -28,8 +28,8 @@ static struct sock* bp_alloc_socket(struct net* net, int kern) sock_init_data(NULL, sk); bp = bp_sk(sk); - skb_queue_head_init(&bp->queue); - init_waitqueue_head(&bp->wait_queue); + skb_queue_head_init(&bp->rx_queue); + init_waitqueue_head(&bp->rx_waitq); bp->bp_node_id = 0; bp->bp_service_id = 0; } @@ -178,7 +178,7 @@ int bp_release(struct socket* sock) write_lock_bh(&bp_list_lock); sk_del_node_init(sk); write_unlock_bh(&bp_list_lock); - skb_queue_purge(&bp->queue); + skb_queue_purge(&bp->rx_queue); sock->sk = NULL; release_sock(sk); @@ -302,19 +302,25 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) } ret = wait_event_interruptible( - bp->wait_queue, !skb_queue_empty(&bp->queue)); + bp->rx_waitq, !skb_queue_empty(&bp->rx_queue) || bp->rx_canceled); if (ret < 0) { pr_err("bp_recvmsg: interrupted while waiting\n"); goto out; } + if (bp->rx_canceled) { + pr_info("bp_recvmsg: bundle request canceled\n"); + ret = -ECANCELED; + goto out; + } + if (sock_flag(sk, SOCK_DEAD)) { pr_err("bp_recvmsg: socket closed while waiting\n"); ret = -ESHUTDOWN; goto out; } - skb = skb_dequeue(&bp->queue); + skb = skb_dequeue(&bp->rx_queue); if (!skb) { pr_info("bp_recvmsg: no messages in the queue for service %d\n", bp->bp_service_id); diff --git a/bp_socket/af_bp.h b/bp_socket/af_bp.h index de109da..d6b4446 100644 --- a/bp_socket/af_bp.h +++ b/bp_socket/af_bp.h @@ -21,8 +21,9 @@ struct bp_sock { struct sock sk; u_int32_t bp_node_id; u_int8_t bp_service_id; - struct sk_buff_head queue; - wait_queue_head_t wait_queue; + struct sk_buff_head rx_queue; + wait_queue_head_t rx_waitq; + bool rx_canceled; }; int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len); diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 5c55017..fe090d7 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -13,12 +13,19 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { }; static struct genl_ops genl_ops[] = { { - .cmd = BP_GENL_CMD_DELIVER_BUNDLE, - .flags = GENL_ADMIN_PERM, - .policy = nla_policy, - .doit = deliver_bundle_doit, - .dumpit = NULL, -} }; + .cmd = BP_GENL_CMD_DELIVER_BUNDLE, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = deliver_bundle_doit, + .dumpit = NULL, + }, + { + .cmd = BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = cancel_bundle_request_doit, + .dumpit = NULL, + } }; /* Multicast groups for our family */ static const struct genl_multicast_group genl_mcgrps[] = { @@ -147,6 +154,44 @@ int request_bundle_doit( return ret; } +int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info) +{ + struct sock* sk; + struct bp_sock* bp; + u_int32_t dest_node_id, dest_service_id; + + if (!info->attrs[BP_GENL_A_DEST_NODE_ID] + || !info->attrs[BP_GENL_A_DEST_SERVICE_ID]) { + pr_err("cancel_bundle_request: missing required attributes\n"); + return -EINVAL; + } + + dest_node_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_NODE_ID]); + dest_service_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_SERVICE_ID]); + + read_lock_bh(&bp_list_lock); + sk_for_each(sk, &bp_list) + { + bh_lock_sock(sk); + bp = bp_sk(sk); + + if (bp->bp_node_id == dest_node_id + && bp->bp_service_id == dest_service_id) { + + if (waitqueue_active(&bp->rx_waitq)) { + bp->rx_canceled = true; + wake_up_interruptible(&bp->rx_waitq); + } + bh_unlock_sock(sk); + break; + } + bh_unlock_sock(sk); + } + read_unlock_bh(&bp_list_lock); + + return 0; +} + int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) { struct sock* sk; @@ -194,10 +239,11 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) if (bp->bp_node_id == dest_node_id && bp->bp_service_id == dest_service_id) { - skb_queue_tail(&bp->queue, new_skb); + skb_queue_tail(&bp->rx_queue, new_skb); new_skb_queued = true; - if (waitqueue_active(&bp->wait_queue)) - wake_up_interruptible(&bp->wait_queue); + if (waitqueue_active(&bp->rx_waitq)) { + wake_up_interruptible(&bp->rx_waitq); + } bh_unlock_sock(sk); break; } diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index 48b223a..549cc31 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -10,6 +10,7 @@ int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); int request_bundle_doit( u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); +int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info); int destroy_bundle_doit( u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index 7b22979..965b551 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -99,28 +99,90 @@ void *handle_recv_thread(struct thread_args *args) { int err; struct reply_bundle reply; - reply = bp_recv_once(args->sdr, args->node_id, args->service_id); + reply = bp_recv_once(args->sdr, args->node_id, + args->service_id); // Blocking invocation to receive a bundle if (!reply.is_present) { - log_warn("[ipn:%u.%u] REQUEST_BUNDLE: no bundle received", args->node_id, args->service_id); + err = handle_cancel_bundle_request(args->netlink_family, args->netlink_sock, args->node_id, + args->service_id); + if (err < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request failed with error %d", + args->node_id, args->service_id, err); + goto out; + } + + log_info("[ipn:%u.%u] CANCEL_BUNDLE_REQUEST: bundle request cancelled", args->node_id, + args->service_id); + } else { + err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, reply.payload, + reply.payload_size, reply.src_node_id, reply.src_service_id, + args->node_id, args->service_id); + if (err < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, + args->service_id, err); + goto out; + } + + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", reply.src_node_id, + reply.src_service_id); + } + +out: + if (reply.payload) free(reply.payload); + free(args); + return NULL; +} + +int handle_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, + u_int32_t node_id, u_int32_t service_id) { + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to allocate Netlink msg", + node_id, service_id); + ret = -ENOMEM; goto out; } - err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, reply.payload, - reply.payload_size, reply.src_node_id, reply.src_service_id, - args->node_id, args->service_id); - if (err < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, - args->service_id, err); + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to create Netlink header", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id) < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to add NODE_ID attribute", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id) < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to add SERVICE_ID attribute", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + if (ret < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: bundle request not cancelled", node_id, + service_id); + ret = -errno; goto out; } - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", reply.src_node_id, - reply.src_service_id); + return 0; +err_free_msg: + nlmsg_free(msg); out: - if (reply.payload) free(reply.payload); - free(args); - return NULL; + return ret; } int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index f4be443..e0b10a7 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -14,6 +14,8 @@ struct thread_args { int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); +int handle_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, + u_int32_t node_id, u_int32_t service_id); int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, u_int32_t dest_node_id, u_int32_t dest_service_id); diff --git a/include/bp_socket.h b/include/bp_socket.h index 15b11eb..c962b9f 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -32,6 +32,7 @@ enum bp_genl_cmds { BP_GENL_CMD_UNSPEC, BP_GENL_CMD_SEND_BUNDLE, BP_GENL_CMD_REQUEST_BUNDLE, + BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_CMD_DESTROY_BUNDLE, __BP_GENL_CMD_MAX, From 65b7db5fd9087bb7b15933efdd15a59d54442e0a Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Tue, 29 Jul 2025 16:31:04 +0200 Subject: [PATCH 7/8] refactor: trigger an error when adu is keeped in memory Signed-off-by: Sylvain Pierrot --- daemon/bp_genl_handlers.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index 965b551..5b25e09 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -250,7 +250,7 @@ int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void "memory (no active BP socket " "client)", dest_node_id, dest_service_id); - ret = 0; // Do not return an error, just log it + ret = -ENODEV; goto out; } From 503f4c4aa465440e46c6538a85551d3d868900a5 Mon Sep 17 00:00:00 2001 From: Sylvain Pierrot Date: Tue, 29 Jul 2025 17:16:31 +0200 Subject: [PATCH 8/8] feat: enforce binding before sendmsg and recvmsg Signed-off-by: Sylvain Pierrot --- bp_socket/af_bp.c | 31 ++++++++++++++++++++++--------- bp_socket/bp_genl.c | 23 ++++++++++++++++++++--- bp_socket/bp_genl.h | 3 ++- daemon/bp_genl_handlers.c | 29 ++++++++++++++++------------- sender.c | 13 ++++++++++++- 5 files changed, 72 insertions(+), 27 deletions(-) diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 018ba89..9bf9d7b 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -192,9 +192,15 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) { struct sockaddr_bp* addr; void* payload; - u_int32_t service_id; - u_int32_t node_id; + u_int32_t dest_node_id, dest_service_id; int ret; + struct bp_sock* bp = bp_sk(sock->sk); + + if (bp->bp_node_id == 0 || bp->bp_service_id == 0) { + pr_err("bp_sendmsg: socket must be bound before sending\n"); + ret = -EADDRNOTAVAIL; + goto out; + } if (!msg->msg_name) { pr_err("bp_sendmsg: no destination address provided\n"); @@ -226,21 +232,21 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto out; } - service_id = addr->bp_addr.ipn.service_id; - node_id = addr->bp_addr.ipn.node_id; + dest_node_id = addr->bp_addr.ipn.node_id; + dest_service_id = addr->bp_addr.ipn.service_id; // https://www.rfc-editor.org/rfc/rfc9758.html#name-node-numbers - if (node_id > 0xFFFFFFFF) { + if (dest_node_id > 0xFFFFFFFF) { pr_err("bp_bind: invalid node ID (must be in [0;2^31])\n"); ret = -EINVAL; goto out; } // https://www.rfc-editor.org/rfc/rfc9758.html#name-service-numbers - if (service_id < 1 || service_id > 0xFFFFFFFF) { + if (dest_service_id < 1 || dest_service_id > 0xFFFFFFFF) { pr_err("bp_bind: invalid service ID %d (must be in " "[1;2^31])\n", - service_id); + dest_service_id); ret = -EINVAL; goto out; } @@ -265,8 +271,8 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto err_free; } - ret = send_bundle_doit( - payload, size, node_id, service_id, 8443); + ret = send_bundle_doit(payload, size, dest_node_id, + dest_service_id, bp->bp_node_id, bp->bp_service_id, 8443); if (ret < 0) { pr_err( "bp_sendmsg: send_bundle_doit failed (%d)\n", ret); @@ -295,6 +301,13 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) sk = sock->sk; lock_sock(sk); bp = bp_sk(sk); + + if (bp->bp_node_id == 0 || bp->bp_service_id == 0) { + pr_err("bp_recvmsg: socket must be bound before receiving\n"); + ret = -EADDRNOTAVAIL; + goto out; + } + ret = request_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443); if (ret < 0) { pr_err("bp_recvmsg: request_bundle_doit failed (%d)\n", ret); diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index fe090d7..78c028f 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -45,15 +45,16 @@ struct genl_family genl_fam = { }; int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, - u_int32_t dest_service_id, int port_id) + u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, + int port_id) { void* msg_head; struct sk_buff* msg; size_t msg_size; int ret; - msg_size = nla_total_size(sizeof(u_int32_t)) - + nla_total_size(sizeof(u_int32_t)) + nla_total_size(payload_size); + msg_size = 4 * nla_total_size(sizeof(u_int32_t)) + + nla_total_size(payload_size); msg = genlmsg_new(msg_size, GFP_KERNEL); if (!msg) { pr_err("send_bundle: failed to allocate message buffer\n"); @@ -85,6 +86,22 @@ int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, goto err_cancel; } + ret = nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id); + if (ret) { + pr_err( + "send_bundle: failed to put BP_GENL_A_SRC_NODE_ID (%d)\n", + ret); + goto err_cancel; + } + + ret = nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id); + if (ret) { + pr_err("send_bundle: failed to put BP_GENL_A_SRC_SERVICE_ID " + "(%d)\n", + ret); + goto err_cancel; + } + ret = nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload); if (ret) { pr_err( diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index 549cc31..3dc1153 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -6,7 +6,8 @@ extern struct genl_family genl_fam; int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, - u_int32_t dest_service_id, int port_id); + u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, + int port_id); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); int request_bundle_doit( u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index 5b25e09..dbaa63c 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -18,13 +18,14 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { void *payload; size_t payload_size; - u_int32_t node_id, service_id; + u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; char dest_eid[64]; int err = 0; int written; if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || - !attrs[BP_GENL_A_DEST_SERVICE_ID]) { + !attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_SRC_NODE_ID] || + !attrs[BP_GENL_A_SRC_SERVICE_ID]) { log_error( "handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, " "service ID)"); @@ -33,25 +34,27 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { payload = nla_data(attrs[BP_GENL_A_PAYLOAD]); payload_size = nla_len(attrs[BP_GENL_A_PAYLOAD]); - node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); + dest_node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + dest_service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); + src_node_id = nla_get_u32(attrs[BP_GENL_A_SRC_NODE_ID]); + src_service_id = nla_get_u32(attrs[BP_GENL_A_SRC_SERVICE_ID]); - written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", node_id, service_id); + written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", dest_node_id, dest_service_id); if (written < 0 || written >= (int)sizeof(dest_eid)) { - log_error("[ipn:%u.%u] handle_send_bundle: failed to construct EID string", node_id, - service_id); + log_error("[ipn:%u.%u] handle_send_bundle: failed to construct EID string", src_node_id, + src_service_id); return -EINVAL; } err = bp_send_to_eid(daemon->sdr, payload, payload_size, dest_eid); if (err < 0) { - log_error("[ipn:%u.%u] handle_send_bundle: bp_send_to_eid failed with error %d", node_id, - service_id, err); + log_error("[ipn:%u.%u] handle_send_bundle: bp_send_to_eid failed with error %d", + dest_node_id, dest_service_id, err); return err; } - log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %d (bytes)", node_id, service_id, - dest_eid, payload_size); + log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %d (bytes)", src_node_id, + src_service_id, dest_eid, payload_size); return 0; } @@ -122,8 +125,8 @@ void *handle_recv_thread(struct thread_args *args) { goto out; } - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", reply.src_node_id, - reply.src_service_id); + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", args->node_id, + args->service_id); } out: diff --git a/sender.c b/sender.c index 87c250a..e7d717a 100644 --- a/sender.c +++ b/sender.c @@ -9,7 +9,7 @@ #define AF_BP 28 int main(int argc, char *argv[]) { - struct sockaddr_bp dest_addr; + struct sockaddr_bp dest_addr, src_addr; int fd; uint32_t node_id, service_id; int ret = 0; @@ -38,6 +38,17 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } + src_addr.bp_family = AF_BP; + src_addr.bp_scheme = BP_SCHEME_IPN; + src_addr.bp_addr.ipn.node_id = 10; + src_addr.bp_addr.ipn.service_id = 1; + + if (bind(fd, (struct sockaddr *)&src_addr, sizeof(src_addr)) == -1) { + perror("Failed to bind socket"); + ret = EXIT_FAILURE; + goto out; + } + dest_addr.bp_family = AF_BP; dest_addr.bp_scheme = BP_SCHEME_IPN; dest_addr.bp_addr.ipn.node_id = node_id;