Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions components/livekit/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ typedef struct {
} detail;
} engine_event_t;

#define MAX_AUDIO_SUBSCRIPTIONS 4

typedef struct {
bool is_subscriber_primary;
livekit_pb_sid_t local_participant_sid;
livekit_pb_sid_t sub_audio_track_sid;
livekit_pb_sid_t sub_audio_track_sids[MAX_AUDIO_SUBSCRIPTIONS];
uint8_t sub_audio_track_count;
} session_state_t;

typedef struct {
Expand Down Expand Up @@ -152,19 +155,32 @@ static engine_err_t subscribe_tracks(engine_t *eng, livekit_pb_track_info_t *tra
if (tracks == NULL || count <= 0) {
return ENGINE_ERR_INVALID_ARG;
}
if (eng->session.sub_audio_track_sid[0] != '\0') {
return ENGINE_ERR_MAX_SUB;
}
for (int i = 0; i < count; i++) {
livekit_pb_track_info_t *track = &tracks[i];
if (track->type != LIVEKIT_PB_TRACK_TYPE_AUDIO) {
continue;
}
// For now, subscribe to the first audio track.
// Check if already subscribed to this track
bool already_subscribed = false;
for (uint8_t j = 0; j < eng->session.sub_audio_track_count; j++) {
if (strncmp(eng->session.sub_audio_track_sids[j], track->sid,
sizeof(livekit_pb_sid_t)) == 0) {
already_subscribed = true;
break;
}
}
if (already_subscribed) {
continue;
}
if (eng->session.sub_audio_track_count >= MAX_AUDIO_SUBSCRIPTIONS) {
ESP_LOGW(TAG, "Max audio subscriptions reached (%d)", MAX_AUDIO_SUBSCRIPTIONS);
return ENGINE_ERR_MAX_SUB;
}
ESP_LOGI(TAG, "Subscribing to audio track: sid=%s", track->sid);
signal_send_update_subscription(eng->signal_handle, track->sid, true);
strlcpy(eng->session.sub_audio_track_sid, track->sid, sizeof(eng->session.sub_audio_track_sid));
break;
strlcpy(eng->session.sub_audio_track_sids[eng->session.sub_audio_track_count],
track->sid, sizeof(livekit_pb_sid_t));
eng->session.sub_audio_track_count++;
}
return ENGINE_ERR_NONE;
}
Expand Down Expand Up @@ -692,12 +708,11 @@ static bool handle_join(engine_t *eng, const livekit_pb_join_response_t *join)

// 6. Subscribe to remote tracks that have already been published.
for (pb_size_t i = 0; i < join->other_participants_count; i++) {
engine_err_t ret = subscribe_tracks(
subscribe_tracks(
eng,
join->other_participants[i].tracks,
join->other_participants[i].tracks_count
);
if (ret == ENGINE_ERR_MAX_SUB) break;
}
return true;
}
Expand Down
245 changes: 240 additions & 5 deletions components/livekit/core/peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <stdlib.h>
#include <string.h>
#include "esp_log.h"
#include "esp_peer.h"
#include "esp_peer_default.h"
Expand All @@ -36,6 +37,121 @@ static const char *PUB_TAG = "livekit_peer.pub";
#define PC_RESUME_BIT (1 << 2)
#define PC_SEND_QUIT_BIT (1 << 3)

// RFC 6464 audio level RTP header extension constants
#define AUDIO_LEVEL_URI "urn:ietf:params:rtp-hdrext:ssrc-audio-level"
#define AUDIO_LEVEL_DEFAULT 30 // Placeholder: -30 dBov (fixed level, not measured from actual audio)
#define RTP_EXT_BLOCK_SIZE 8 // 4-byte profile+length header + 1-byte element ID + 1-byte audio level + 2-byte padding

// MARK: - Audio level RTP header extension (RFC 6464)

/// Finds the smallest unused extmap ID (1-14) within an SDP media section.
static uint8_t sdp_find_unused_extmap_id(const char *section_start, const char *section_end)
{
uint16_t used_ids = 0;
const char *p = section_start;
while ((p = strstr(p, "\na=extmap:")) != NULL && p < section_end) {
p += 10; // skip "\na=extmap:"
int id = atoi(p);
if (id >= 1 && id <= 14) {
used_ids |= (1u << id);
}
}
for (uint8_t id = 1; id <= 14; id++) {
if (!(used_ids & (1u << id))) {
return id;
}
}
return 0;
}

/// Parses the Opus payload type from an SDP string.
static uint8_t sdp_parse_opus_payload_type(const char *sdp)
{
const char *p = sdp;
while ((p = strstr(p, "a=rtpmap:")) != NULL) {
p += 9;
int pt = atoi(p);
const char *eol = strpbrk(p, "\r\n");
if (eol == NULL) eol = p + strlen(p);
const char *opus = strstr(p, " opus/");
if (opus != NULL && opus < eol && pt >= 0 && pt <= 127) {
return (uint8_t)pt;
}
}
return 0;
}

/// Injects the ssrc-audio-level extmap attribute into the audio section of an SDP string.
/// Dynamically selects an unused extmap ID (1-14) to avoid conflicts.
/// Sets *out_extmap_id to the chosen ID on success.
/// Returns a newly allocated SDP string with the extmap line inserted, or NULL on failure.
/// The caller must free the returned string.
static char *sdp_inject_audio_level_extmap(const char *sdp, uint8_t *out_extmap_id)
{
// Find "m=audio" section
const char *m_audio = strstr(sdp, "m=audio");
if (m_audio == NULL) {
return NULL;
}

// Determine audio section boundary (ends at next "m=" line or end of string)
const char *audio_section_end = strstr(m_audio + 1, "\nm=");
if (audio_section_end == NULL) {
audio_section_end = sdp + strlen(sdp);
}

// Find an unused extmap ID
uint8_t extmap_id = sdp_find_unused_extmap_id(m_audio, audio_section_end);
if (extmap_id == 0) {
return NULL;
}

// Find direction attribute as insertion point
const char *insert_before = NULL;
const char *p;
p = strstr(m_audio, "\na=sendrecv");
if (p != NULL && p < audio_section_end) {
insert_before = p + 1; // skip the leading \n
}
if (insert_before == NULL) {
p = strstr(m_audio, "\na=sendonly");
if (p != NULL && p < audio_section_end) {
insert_before = p + 1;
}
}
if (insert_before == NULL) {
p = strstr(m_audio, "\na=recvonly");
if (p != NULL && p < audio_section_end) {
insert_before = p + 1;
}
}
if (insert_before == NULL) {
return NULL;
}

// Build the extmap line
char extmap_line[80];
int extmap_len = snprintf(extmap_line, sizeof(extmap_line),
"a=extmap:%u %s\r\n", extmap_id, AUDIO_LEVEL_URI);

size_t orig_len = strlen(sdp);
char *new_sdp = (char *)malloc(orig_len + (size_t)extmap_len + 1);
if (new_sdp == NULL) {
return NULL;
}

// Copy everything before the insertion point
size_t prefix_len = (size_t)(insert_before - sdp);
memcpy(new_sdp, sdp, prefix_len);
// Insert the extmap line
memcpy(new_sdp + prefix_len, extmap_line, (size_t)extmap_len);
// Copy the rest (including null terminator)
memcpy(new_sdp + prefix_len + (size_t)extmap_len, insert_before, orig_len - prefix_len + 1);

*out_extmap_id = extmap_id;
return new_sdp;
}

typedef struct {
peer_options_t options;
esp_peer_role_t ice_role;
Expand All @@ -50,11 +166,94 @@ typedef struct {
uint16_t reliable_stream_id;
uint16_t lossy_stream_id;

uint8_t audio_level_extmap_id; // Negotiated extmap ID for audio level extension
uint8_t opus_payload_type; // Negotiated Opus payload type from SDP

#if CONFIG_LK_BENCHMARK
uint64_t start_time;
#endif
} peer_t;

/// RTP transformer callback: compute encoded size (original + 8 bytes for extension block).
static int audio_level_get_encoded_size(esp_peer_rtp_frame_t *frame, bool *in_place, void *ctx)
{
peer_t *peer = (peer_t *)ctx;
// Skip if Opus PT not yet parsed or doesn't match this packet
if (peer == NULL || peer->opus_payload_type == 0 ||
frame->payload_type != peer->opus_payload_type) {
return ESP_PEER_ERR_NOT_SUPPORT;
}
// Validate minimum RTP packet size (12 bytes fixed header)
if (frame->orig_data == NULL || frame->orig_size < 12) {
return ESP_PEER_ERR_NOT_SUPPORT;
}
// Skip if packet already has header extensions (X bit set)
if (frame->orig_data[0] & 0x10) {
return ESP_PEER_ERR_NOT_SUPPORT;
}
// Validate CSRC count doesn't exceed packet size
uint8_t cc = frame->orig_data[0] & 0x0F;
if (frame->orig_size < (uint32_t)(12 + cc * 4)) {
return ESP_PEER_ERR_NOT_SUPPORT;
}
frame->encoded_size = frame->orig_size + RTP_EXT_BLOCK_SIZE;
*in_place = false;
return 0;
}

/// RTP transformer callback: inject audio level header extension into RTP packet.
///
/// Inserts an RFC 5285 one-byte header extension with RFC 6464 audio level data
/// between the RTP fixed header and the payload.
///
/// Packet layout after transform:
/// [RTP Header (X bit set)] [Extension Block (8 bytes)] [Payload]
///
static int audio_level_transform(esp_peer_rtp_frame_t *frame, void *ctx)
{
peer_t *peer = (peer_t *)ctx;
uint8_t *orig = frame->orig_data;
uint8_t *enc = frame->encoded_data;
if (orig == NULL || enc == NULL) {
return ESP_PEER_ERR_NOT_SUPPORT;
}

uint8_t extmap_id = (peer != NULL) ? peer->audio_level_extmap_id : 1;

// Calculate RTP header length: 12 bytes fixed + 4 * CC (CSRC count)
uint8_t cc = orig[0] & 0x0F;
uint32_t header_len = 12u + (uint32_t)cc * 4u;

// Copy RTP header
memcpy(enc, orig, header_len);

// Set the X (extension) bit in the first byte
enc[0] |= 0x10;

// Build the extension block (8 bytes total, one 32-bit word of extension data)
uint8_t *ext = enc + header_len;
ext[0] = 0xBE; // RFC 5285 one-byte header profile
ext[1] = 0xDE;
ext[2] = 0x00; // Extension length: 1 word (32 bits)
ext[3] = 0x01;
ext[4] = (extmap_id << 4) | 0; // ID=negotiated, L=0 (1 byte of data follows)
ext[5] = 0x80 | AUDIO_LEVEL_DEFAULT; // V=1 (voice active), placeholder level
ext[6] = 0x00; // Padding
ext[7] = 0x00; // Padding

// Copy payload after the extension block
memcpy(enc + header_len + RTP_EXT_BLOCK_SIZE,
orig + header_len,
frame->orig_size - header_len);

return 0;
}

static esp_peer_rtp_transform_cb_t audio_level_transform_cb = {
.get_encoded_size = audio_level_get_encoded_size,
.transform = audio_level_transform
};

static esp_peer_media_dir_t get_media_direction(esp_peer_media_dir_t direction, peer_role_t role) {
switch (role) {
case PEER_ROLE_PUBLISHER: return direction & ESP_PEER_MEDIA_DIR_SEND_ONLY;
Expand Down Expand Up @@ -148,12 +347,31 @@ static int on_msg(esp_peer_msg_t *info, void *ctx)
{
peer_t *peer = (peer_t *)ctx;
switch (info->type) {
case ESP_PEER_MSG_TYPE_SDP:
ESP_LOGI(TAG(peer), "Generated %s:\n%s",
peer->options.role == PEER_ROLE_PUBLISHER ? "offer" : "answer",
(char *)info->data);
peer->options.on_sdp((char *)info->data, peer->options.role, peer->options.ctx);
case ESP_PEER_MSG_TYPE_SDP: {
const char *sdp = (const char *)info->data;

// For publisher SDP, inject audio level extmap for Active Speaker detection
if (peer->options.role == PEER_ROLE_PUBLISHER) {
uint8_t extmap_id = 0;
char *patched_sdp = sdp_inject_audio_level_extmap(sdp, &extmap_id);
if (patched_sdp != NULL) {
peer->audio_level_extmap_id = extmap_id;
peer->opus_payload_type = sdp_parse_opus_payload_type(patched_sdp);
ESP_LOGD(TAG(peer), "Generated offer (with audio-level extmap id=%u):\n%s",
extmap_id, patched_sdp);
peer->options.on_sdp(patched_sdp, peer->options.role, peer->options.ctx);
free(patched_sdp);
} else {
ESP_LOGW(TAG(peer), "Failed to inject extmap, sending original SDP");
ESP_LOGD(TAG(peer), "Generated offer:\n%s", sdp);
peer->options.on_sdp(sdp, peer->options.role, peer->options.ctx);
}
} else {
ESP_LOGD(TAG(peer), "Generated answer:\n%s", sdp);
peer->options.on_sdp(sdp, peer->options.role, peer->options.ctx);
}
break;
}
default:
ESP_LOGD(TAG(peer), "Unhandled msg type: %d", info->type);
break;
Expand Down Expand Up @@ -329,6 +547,23 @@ peer_err_t peer_create(peer_handle_t *handle, peer_options_t *options)
free(peer);
return PEER_ERR_RTC;
}

// Set RTP transformer for publisher to inject audio level header extension
if (options->role == PEER_ROLE_PUBLISHER &&
options->media->audio_info.codec != ESP_PEER_AUDIO_CODEC_NONE) {
int ret = esp_peer_set_rtp_transformer(
peer->connection,
ESP_PEER_RTP_TRANSFORM_ROLE_SENDER,
&audio_level_transform_cb,
peer
);
if (ret != ESP_PEER_ERR_NONE) {
ESP_LOGW(TAG(peer), "Failed to set audio level RTP transformer: %d", ret);
} else {
ESP_LOGI(TAG(peer), "Audio level RTP transformer enabled");
}
}

*handle = (peer_handle_t)peer;
return PEER_ERR_NONE;
}
Expand Down