diff --git a/CMakeLists.txt b/CMakeLists.txt index 82b328b..b93f97a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,13 @@ if(CMAKE_BUILD_TYPE MATCHES "Release") set_target_properties(ixwebsocket PROPERTIES LINK_FLAGS_RELEASE "-s -w") #-static-libgcc -static-libstdc++ endif() -add_library(mod_openai_audio_stream SHARED mod_openai_audio_stream.c mod_openai_audio_stream.h openai_audio_streamer_glue.h openai_audio_streamer_glue.cpp buffer/ringbuffer.c base64.cpp) +add_library(mod_openai_audio_stream SHARED + mod_openai_audio_stream.c + mod_openai_audio_stream.h + openai_audio_streamer_glue.h + openai_audio_streamer_glue.cpp + base64.cpp +) set_property(TARGET mod_openai_audio_stream PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/README.md b/README.md index 9e6c48a..a8fe093 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,18 @@ -# mod_openai_audio_stream +# mod_openai_realtime -![Build](https://github.com/VoiSmart/mod_openai_audio_stream/actions/workflows/build.yml/badge.svg?branch=main) -![Code-Checks](https://github.com/VoiSmart/mod_openai_audio_stream/actions/workflows/code-checks.yml/badge.svg?branch=main) +![Build](https://github.com/VoiSmart/mod_openai_realtime/actions/workflows/build.yml/badge.svg?branch=main) +![Code-Checks](https://github.com/VoiSmart/mod_openai_realtime/actions/workflows/code-checks.yml/badge.svg?branch=main) [![License: MIT](https://img.shields.io/badge/license-MIT-blue?style=flat)](LICENSE) -**mod_openai_audio_stream** is a FreeSWITCH module that streams L16 audio from a channel to an OpenAI Realtime WebSocket endpoint. The stream follows OpenAI's Realtime API specification and enables real-time audio playback directly in the channel. +**mod_openai_realtime** is a FreeSWITCH module that streams L16 audio from a channel to an OpenAI Realtime WebSocket endpoint. The stream follows OpenAI's Realtime API specification and enables real-time audio playback directly in the channel. + +> [!WARNING] +> This is a standalone fork of `mod_audio_stream`, not affiliated with the original project. +> Legacy naming (`mod_openai_audio_stream`) is retained for backward compatibility but will be updated in a future major release. It is a fork of [mod_audio_stream](https://github.com/amigniter/mod_audio_stream), specifically adapted for streaming audio to OpenAI's Realtime API and playing the responses back to the user via FreeSWITCH and WebSocket. -The goal of **mod_openai_audio_stream** is to provide a simple, lightweight, yet effective module for streaming audio and receiving responses directly from OpenAI’s Realtime WebSocket into the call through FreeSWITCH. It uses [ixwebsocket](https://machinezone.github.io/IXWebSocket/), a C++ WebSocket library compiled as a static library. +The goal of **mod_openai_realtime** is to provide a simple, lightweight, yet effective module for streaming audio and receiving responses directly from OpenAI’s Realtime WebSocket into the call through FreeSWITCH. It uses [ixwebsocket](https://machinezone.github.io/IXWebSocket/), a C++ WebSocket library compiled as a static library. ## Important Notes @@ -76,7 +80,7 @@ The following is **a simple dialplan example** that demonstrates how to use the * Make sure to replace `sk-xxxxxxxxxxxxxxxxxx` with your actual OpenAI API key. * The dialplan answers the call and starts streaming audio to OpenAI's Realtime API using `uuid_openai_audio_stream`, so you can try it out and see the OpenAI events in the FreeSWITCH console within the `mod_openai_audio_stream::json` events and other module events. -* The playback action with `silence_stream://-1//` is needed for audio playback to work properly. For more details, check issue [#16](https://github.com/VoiSmart/mod_openai_audio_stream/issues/16). +* The playback action with `silence_stream://-1//` is needed for audio playback to work properly. For more details, check issue [#16](https://github.com/VoiSmart/mod_openai_realtime/issues/16). #### Next steps @@ -85,7 +89,7 @@ The **getting started** example is a basic demonstration of how to use the modul This way you can build more complex applications **allowing for function calls, updating instructions**, and other interactions with OpenAI's Realtime API. Check out the [OpenAI Realtime documentation](https://platform.openai.com/docs/guides/realtime) and [API reference](https://platform.openai.com/docs/api-reference/realtime) for more details on how to structure your requests and handle responses. ### Channel variables -The following channel variables can be used to fine tune websocket connection and also configure mod_openai_audio_stream logging: +The following channel variables can be used to fine-tune websocket connection and also configure mod_openai_realtime logging: | Variable | Description | Default | | -------------------------------------- | ------------------------------------------------------- | ------- | diff --git a/buffer/ringbuffer.c b/buffer/ringbuffer.c deleted file mode 100644 index e752df0..0000000 --- a/buffer/ringbuffer.c +++ /dev/null @@ -1,123 +0,0 @@ -#include "ringbuffer.h" - -#define NO_MEM_COPY -#ifndef NO_MEM_COPY -#include -#else -#ifndef memcpy -static void *memcpy(void *dst, const void *src, size_t n) -{ - size_t i = 0; - /* Verify if n, and the pointers are word aligned. - * If it's word aligned copy by word. - */ - if((uintptr_t)dst % sizeof(uint32_t) == 0 && - (uintptr_t)src % sizeof(uint32_t) == 0 && - n % sizeof(uint32_t) == 0) { - - uint32_t *d_word = dst; - const uint32_t *s_word = src; - - for (i=0; itail = 0; - buffer->head = 0; - buffer->sizeMask = len-1; - buffer->data = data; - return 1; -} - -size_t ringBufferLen(const RingBuffer *buffer) { - if(buffer->tail >= buffer->head) { - return buffer->tail-buffer->head; - } - - return buffer->sizeMask-(buffer->head-buffer->tail)+1; -} - -uint8_t ringBufferEmpty(const RingBuffer *buffer) { - return (buffer->tail == buffer->head); -} - -size_t ringBufferFreeSpace(const RingBuffer *buffer){ - return buffer->sizeMask - ringBufferLen(buffer); -} - -size_t ringBufferMaxSize(const RingBuffer *buffer) { - return buffer->sizeMask; -} - -void ringBufferAppendOne(RingBuffer *buffer, uint8_t data){ - buffer->data[buffer->tail] = data; - buffer->tail = (buffer->tail + 1) & buffer->sizeMask; -} - -void ringBufferAppendMultiple(RingBuffer *buffer, const uint8_t *data, size_t len){ - if(buffer->tail + len > buffer->sizeMask) { - uint32_t lenToTheEnd = buffer->sizeMask - buffer->tail + 1; - uint32_t lenFromBegin = len - lenToTheEnd; - memcpy(buffer->data + buffer->tail, data, lenToTheEnd); - memcpy(buffer->data, data + lenToTheEnd, lenFromBegin); - } - else { - memcpy(buffer->data + buffer->tail, data, len); - } - buffer->tail = (buffer->tail + len) & buffer->sizeMask; -} - -uint8_t ringBufferPeekOne(const RingBuffer *buffer){ - return buffer->data[buffer->head]; -} - -uint8_t ringBufferGetOne(RingBuffer *buffer){ - uint8_t data = buffer->data[buffer->head]; - buffer->head = (buffer->head + 1) & buffer->sizeMask; - return data; -} - -void ringBufferGetMultiple(RingBuffer *buffer, uint8_t *dst, size_t len) { - ringBufferPeekMultiple(buffer, dst, len); - buffer->head = (buffer->head + len) & buffer->sizeMask; -} - -void ringBufferPeekMultiple(const RingBuffer *buffer, uint8_t *dst, size_t len){ - if(buffer->head + len > buffer->sizeMask) { - uint32_t lenToTheEnd = buffer->sizeMask - buffer->head + 1; - uint32_t lenFromBegin = len - lenToTheEnd; - memcpy(dst, buffer->data + buffer->head, lenToTheEnd); - memcpy(dst + lenToTheEnd, buffer->data, lenFromBegin); - } - else { - memcpy(dst, buffer->data + buffer->head, len); - } -} - -void ringBufferDiscardMultiple(RingBuffer *buffer, size_t len){ - buffer->head = (buffer->head + len) & buffer->sizeMask; -} - -void ringBufferClear(RingBuffer *buffer){ - buffer->head = buffer->tail = 0; -} diff --git a/buffer/ringbuffer.h b/buffer/ringbuffer.h deleted file mode 100644 index fa8c7f4..0000000 --- a/buffer/ringbuffer.h +++ /dev/null @@ -1,74 +0,0 @@ -#ifndef __RING_BUFFER_H_ -#define __RING_BUFFER_H_ - -// Uncomment if your platform does not have memcopy implementation. -// #define NO_MEM_COPY - -#include -#include - - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - size_t tail; - size_t head; - size_t sizeMask; - uint8_t *data; -}RingBuffer; - -// Init Data Structure. -int ringBufferInit(RingBuffer *buffer, uint8_t *data, size_t len); - -// Returns ring buffer len. -size_t ringBufferLen(const RingBuffer *buffer); - -// Returns whether the ringbuferr is empty. -uint8_t ringBufferEmpty(const RingBuffer *buffer); - -// Available space left in the ringbuffer. -size_t ringBufferFreeSpace(const RingBuffer *buffer); - -// Maximum number of bytes this ring buffer can store. -size_t ringBufferMaxSize(const RingBuffer *buffer); - -// Append one byte to the buffer. -void ringBufferAppendOne(RingBuffer *buffer, uint8_t data); - -// Append multiple bytest to the buffer. -void ringBufferAppendMultiple(RingBuffer *buffer, const uint8_t *data, size_t len); - -// Return the first element but don't remove it. -uint8_t ringBufferPeekOne(const RingBuffer *buffer); - -// Return the first element and remove it from the buffer. -uint8_t ringBufferGetOne(RingBuffer *buffer); - -// Return len number of bytes from the ring buffer. -void ringBufferGetMultiple(RingBuffer *buffer, uint8_t *dst, size_t len); - -// Return len number of bytes without removing from the buffer. -void ringBufferPeekMultiple(const RingBuffer *buffer, uint8_t *dst, size_t len); - -// Discard len bytes from the buffer. -void ringBufferDiscardMultiple(RingBuffer *buffer, size_t len); - -// Clean the buffer to th initial state. -void ringBufferClear(RingBuffer *buffer); - -static inline int isMultipleTwo(size_t len) -{ - if(!(len && !(len & (len - 1)))) { - return 0; - } else { - return 1; - } -} - -#ifdef __cplusplus - } -#endif - -#endif diff --git a/mod_openai_audio_stream.c b/mod_openai_audio_stream.c index 945c3ab..00da472 100644 --- a/mod_openai_audio_stream.c +++ b/mod_openai_audio_stream.c @@ -170,12 +170,12 @@ static switch_status_t send_json(switch_core_session_t *session, char* json) { #define STREAM_API_SYNTAX \ "USAGE:\n" \ "--------------------------------------------------------------------------------\n" \ -"uuid_openai_audio_stream [start | stop | send_json | pause | resume |\n" \ -" mute | unmute]\n" \ -" [wss-url | path | user | openai | all | base64json]\n" \ -" [mono | mixed | stereo]\n" \ -" [8000 | 16000 | 24000]\n" \ -" [mute_user]\n" \ +"uuid_openai_audio_stream start \n" \ +" [8k | 16k | 24k | ] [mute_user]\n" \ +" where = 8k|16k|24k or any multiple of 8000 (default: 8k)\n" \ +"uuid_openai_audio_stream [stop | pause | resume]\n" \ +"uuid_openai_audio_stream [mute | unmute] [user | openai | all]\n" \ +"uuid_openai_audio_stream send_json \n" \ "--------------------------------------------------------------------------------\n" typedef enum { diff --git a/mod_openai_audio_stream.h b/mod_openai_audio_stream.h index 811d472..228d389 100644 --- a/mod_openai_audio_stream.h +++ b/mod_openai_audio_stream.h @@ -4,7 +4,6 @@ #include #include #include -#include "buffer/ringbuffer.h" #define MY_BUG_NAME "audio_stream" #define MAX_SESSION_ID (256) @@ -33,9 +32,7 @@ struct private_data { int user_audio_muted:1; int openai_audio_muted:1; int close_requested:1; - RingBuffer *buffer; switch_buffer_t *sbuffer; - uint8_t *data; int rtp_packets; switch_buffer_t *playback_buffer; }; diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index 61ccf6e..3d12c67 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -529,34 +529,10 @@ namespace { switch_mutex_init(&tech_pvt->mutex, SWITCH_MUTEX_NESTED, pool); - if (static_cast(desiredSampling) != sampling) { - if (switch_buffer_create(pool, &tech_pvt->sbuffer, buflen) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "%s: Error creating switch buffer.\n", tech_pvt->sessionId); - return SWITCH_STATUS_FALSE; - } - } else { - size_t adjSize = 1; //adjust the buffer size to the closest pow2 size - while(adjSize < buflen) { - adjSize *= 2; - } - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s: initializing buffer(%zu) to adjusted %zu bytes\n", - tech_pvt->sessionId, buflen, adjSize); - tech_pvt->data = static_cast(switch_core_alloc(pool, adjSize)); - if (!tech_pvt->data) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "%s: Error allocating memory for data buffer.\n", tech_pvt->sessionId); - return SWITCH_STATUS_FALSE; - } - memset(tech_pvt->data, 0, adjSize); - tech_pvt->buffer = static_cast(switch_core_alloc(pool, sizeof(RingBuffer))); - if (!tech_pvt->buffer) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "%s: Error allocating memory for ring buffer.\n", tech_pvt->sessionId); - return SWITCH_STATUS_FALSE; - } - memset(tech_pvt->buffer, 0, sizeof(RingBuffer)); - ringBufferInit(tech_pvt->buffer, tech_pvt->data, adjSize); + if (switch_buffer_create(pool, &tech_pvt->sbuffer, buflen) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "%s: Error creating switch buffer.\n", tech_pvt->sessionId); + return SWITCH_STATUS_FALSE; } if (static_cast(desiredSampling) != sampling) { @@ -797,9 +773,6 @@ extern "C" { switch_mutex_lock(tech_pvt->mutex); } - if (tech_pvt->buffer) { - ringBufferClear(tech_pvt->buffer); - } if (tech_pvt->sbuffer) { switch_buffer_zero(tech_pvt->sbuffer); } @@ -948,116 +921,140 @@ extern "C" { return SWITCH_STATUS_SUCCESS; } - switch_bool_t stream_frame(switch_media_bug_t *bug) - { - auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); + switch_bool_t stream_frame(switch_media_bug_t *bug) { + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt || tech_pvt->audio_paused || tech_pvt->user_audio_muted) return SWITCH_TRUE; - if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { - - if (!tech_pvt->pAudioStreamer) { - switch_mutex_unlock(tech_pvt->mutex); - return SWITCH_TRUE; - } - - auto *pAudioStreamer = static_cast(tech_pvt->pAudioStreamer); + if (switch_mutex_trylock(tech_pvt->mutex) != SWITCH_STATUS_SUCCESS) { + return SWITCH_TRUE; + } - if(!pAudioStreamer->isConnected()) { - switch_mutex_unlock(tech_pvt->mutex); - return SWITCH_TRUE; - } + auto *pAudioStreamer = static_cast(tech_pvt->pAudioStreamer); - if (nullptr == tech_pvt->resampler) { - uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; - switch_frame_t frame = {}; - frame.data = data; - frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; - size_t inuse = ringBufferFreeSpace(tech_pvt->buffer); - while (switch_core_media_bug_read(bug, &frame, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { - if(frame.datalen) { - if (1 == tech_pvt->rtp_packets) { - pAudioStreamer->writeAudioDelta(static_cast(frame.data), frame.datalen); - continue; - } + if (!pAudioStreamer || !pAudioStreamer->isConnected()) { + switch_mutex_unlock(tech_pvt->mutex); + return SWITCH_TRUE; + } - size_t remaining = 0; - if(inuse >= frame.datalen) { - ringBufferAppendMultiple(tech_pvt->buffer, static_cast(frame.data), frame.datalen); - } else { - // The remaining space is not sufficient for the entire chunk - // so write first part up to the inuse space - ringBufferAppendMultiple(tech_pvt->buffer, static_cast(frame.data), inuse); - remaining = frame.datalen - inuse; - } + // Pre-allocate reusable buffers to avoid repeated allocations + std::vector flush_buffer; + std::vector resample_buffer; - if(0 == ringBufferFreeSpace(tech_pvt->buffer)) { - size_t nFrames = ringBufferLen(tech_pvt->buffer); - size_t nBytes = nFrames + remaining; - uint8_t chunkPtr[nBytes]; - ringBufferGetMultiple(tech_pvt->buffer, &chunkPtr[0], nBytes); + auto flush_sbuffer = [&]() { + switch_size_t inuse = switch_buffer_inuse(tech_pvt->sbuffer); + if (inuse > 0) { + flush_buffer.resize(inuse); + switch_buffer_read(tech_pvt->sbuffer, flush_buffer.data(), inuse); + switch_buffer_zero(tech_pvt->sbuffer); + pAudioStreamer->writeAudioDelta(flush_buffer.data(), inuse); + } + }; - if(remaining > 0) { - memcpy(&chunkPtr[nBytes - remaining], static_cast(frame.data) + frame.datalen - remaining, remaining); - } + std::vector data_buf(SWITCH_RECOMMENDED_BUFFER_SIZE); + switch_frame_t frame{}; + frame.data = data_buf.data(); + frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; - pAudioStreamer->writeAudioDelta(chunkPtr, nBytes); + while (switch_core_media_bug_read(bug, &frame, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { + // Validate frame data before processing + if (frame.datalen == 0 || frame.samples == 0) { + continue; + } - ringBufferClear(tech_pvt->buffer); + if (!tech_pvt->resampler) { + if (tech_pvt->rtp_packets == 1) { + pAudioStreamer->writeAudioDelta(static_cast(frame.data), frame.datalen); + } else { + size_t write_len = frame.datalen; + const uint8_t *write_data = static_cast(frame.data); + switch_size_t free_space = switch_buffer_freespace(tech_pvt->sbuffer); + if (write_len > free_space) { + flush_sbuffer(); + free_space = switch_buffer_freespace(tech_pvt->sbuffer); + } + // Only write if buffer has enough space + if (write_len <= free_space) { + switch_buffer_write(tech_pvt->sbuffer, write_data, write_len); + if (switch_buffer_freespace(tech_pvt->sbuffer) == 0) { + flush_sbuffer(); } - + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "%s: Dropping %zu bytes of audio data, buffer capacity exceeded\n", + tech_pvt->sessionId, write_len); } } - } else { - uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; - switch_frame_t frame = {}; - frame.data = data; - frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; - const size_t inuse = switch_buffer_freespace(tech_pvt->sbuffer); - - while (switch_core_media_bug_read(bug, &frame, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { - if(frame.datalen) { - spx_uint32_t in_len = frame.samples; - spx_uint32_t out_len = (inuse / (tech_pvt->channels * sizeof(spx_int16_t))); - spx_int16_t out[inuse / sizeof(spx_int16_t)]; - - if(tech_pvt->channels == 1) { - speex_resampler_process_int(tech_pvt->resampler, - 0, - static_cast(frame.data), - &in_len, - &out[0], - &out_len); - } else { - speex_resampler_process_interleaved_int(tech_pvt->resampler, - static_cast(frame.data), - &in_len, - &out[0], - &out_len); - } + continue; + } - if(out_len > 0) { - const size_t bytes_written = out_len * tech_pvt->channels * sizeof(spx_int16_t); - if (tech_pvt->rtp_packets == 1) { //20ms packet - pAudioStreamer->writeAudioDelta(reinterpret_cast(out), bytes_written); - continue; - } - if (bytes_written <= inuse) { - switch_buffer_write(tech_pvt->sbuffer, out, bytes_written); - } - } + size_t available = switch_buffer_freespace(tech_pvt->sbuffer); + spx_uint32_t in_len = frame.samples; + spx_uint32_t out_len = available / (tech_pvt->channels * sizeof(spx_int16_t)); + if (out_len == 0) { + flush_sbuffer(); + available = switch_buffer_freespace(tech_pvt->sbuffer); + out_len = available / (tech_pvt->channels * sizeof(spx_int16_t)); + // Skip processing if buffer still has no space after flushing + if (out_len == 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "%s: Buffer full, cannot process resampled frame\n", + tech_pvt->sessionId); + continue; + } + } + + resample_buffer.resize(out_len * tech_pvt->channels); + + if (tech_pvt->channels == 1) { + speex_resampler_process_int( + tech_pvt->resampler, + 0, + static_cast(frame.data), + &in_len, + resample_buffer.data(), + &out_len + ); + } else { + speex_resampler_process_interleaved_int( + tech_pvt->resampler, + static_cast(frame.data), + &in_len, + resample_buffer.data(), + &out_len + ); + } - if(switch_buffer_freespace(tech_pvt->sbuffer) == 0) { - const switch_size_t buf_len= switch_buffer_inuse(tech_pvt->sbuffer); - uint8_t buf_ptr[buf_len]; - switch_buffer_read(tech_pvt->sbuffer, buf_ptr, buf_len); - switch_buffer_zero(tech_pvt->sbuffer); - pAudioStreamer->writeAudioDelta(buf_ptr, buf_len); + size_t bytes_written = out_len * tech_pvt->channels * sizeof(spx_int16_t); + if (bytes_written > 0) { + // For 20ms packets, send immediately without buffering + if (tech_pvt->rtp_packets == 1) { + pAudioStreamer->writeAudioDelta(reinterpret_cast(resample_buffer.data()), bytes_written); + } else { + // Check if buffer has enough space before writing + switch_size_t free_space = switch_buffer_freespace(tech_pvt->sbuffer); + if (bytes_written > free_space) { + flush_sbuffer(); + free_space = switch_buffer_freespace(tech_pvt->sbuffer); + } + if (bytes_written <= free_space) { + switch_buffer_write( + tech_pvt->sbuffer, + reinterpret_cast(resample_buffer.data()), + bytes_written + ); + if (switch_buffer_freespace(tech_pvt->sbuffer) == 0) { + flush_sbuffer(); } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "%s: Dropping %zu bytes of resampled audio data, buffer capacity exceeded\n", + tech_pvt->sessionId, bytes_written); } } } - switch_mutex_unlock(tech_pvt->mutex); } + + switch_mutex_unlock(tech_pvt->mutex); return SWITCH_TRUE; }