Skip to content

Commit d3748e2

Browse files
author
Michal Adamczak
committed
StreamBuffer for input
Alternative input system for LPMS. The idea is to allow using of FFmpeg demultiplexers with any input mechanism, such as network packet delivery. In order to do that we have to provide custom AVIOContext with custom read() and seek() operations. This also means that synchronization is needed because we will have to perform input and transcoding in different threads (for example demux - which is a part of transcode - may want more data which is not available yet, then transcode should block until input thread delivers more data)
1 parent 13446f1 commit d3748e2

File tree

7 files changed

+385
-7
lines changed

7 files changed

+385
-7
lines changed

ffmpeg/decoder.c

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ int flush_in(struct input_ctx *ictx, AVFrame *frame, int *stream_index)
7373
return AVERROR_EOF;
7474
}
7575

76-
// FIXME: name me and the other function better
76+
// FIXME: name me and the other function better (and move to utilities...)
7777
enum AVPixelFormat hw2pixfmt(AVCodecContext *ctx)
7878
{
7979
const AVCodec *decoder = ctx->codec;
@@ -254,7 +254,26 @@ static void close_video_decoder(struct input_ctx *ictx)
254254
if (ictx->last_frame_v) av_frame_free(&ictx->last_frame_v);
255255
}
256256

257-
int open_input(input_params *params, struct input_ctx *ctx)
257+
int open_demuxer(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer)
258+
{
259+
int ret;
260+
ctx->ic = avformat_alloc_context();
261+
if (buffer) {
262+
// using custom input
263+
if (buffer_setup_as_input(buffer, ctx->ic) < 0) {
264+
// memory allocation failure
265+
return -1;
266+
}
267+
// instruct FFmpeg to use our input, note that file name is empty
268+
ret = avformat_open_input(&ctx->ic, "", NULL, NULL);
269+
} else {
270+
// normal file-based input, note that file name is passed
271+
ret = avformat_open_input(&ctx->ic, params->fname, NULL, NULL);
272+
}
273+
return ret;
274+
}
275+
276+
int open_input(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer)
258277
{
259278
char *inp = params->fname;
260279
int ret = 0;
@@ -268,7 +287,8 @@ int open_input(input_params *params, struct input_ctx *ctx)
268287
ctx->device = params->device;
269288

270289
// open demuxer
271-
ret = avformat_open_input(&ctx->ic, inp, NULL, NULL);
290+
ret = open_demuxer(params, ctx, buffer);
291+
// TODO: maybe move stuff below to open_demuxer?
272292
if (ret < 0) LPMS_ERR(open_input_err, "demuxer: Unable to open input");
273293
ret = avformat_find_stream_info(ctx->ic, NULL);
274294
if (ret < 0) LPMS_ERR(open_input_err, "Unable to find input info");
@@ -341,4 +361,3 @@ void free_input(struct input_ctx *ictx, enum FreeInputPolicy policy)
341361
// audio decoder is always closed
342362
close_audio_decoder(ictx);
343363
}
344-

ffmpeg/decoder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <libavcodec/avcodec.h>
66
#include <libavutil/opt.h>
77
#include "transcoder.h"
8+
#include "stream_buffer.h"
89

910
struct input_ctx {
1011
AVFormatContext *ic; // demuxer required
@@ -67,7 +68,7 @@ int demux_in(struct input_ctx *ictx, AVPacket *pkt);
6768
int decode_in(struct input_ctx *ictx, AVPacket *pkt, AVFrame *frame, int *stream_index);
6869
int flush_in(struct input_ctx *ictx, AVFrame *frame, int *stream_index);
6970
int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt, int *stream_index);
70-
int open_input(input_params *params, struct input_ctx *ctx);
71+
int open_input(input_params *params, struct input_ctx *ctx, StreamBuffer *buffer);
7172
void free_input(struct input_ctx *inctx, enum FreeInputPolicy policy);
7273
// this should perhaps be moved to some utility file, as it is not decoder
7374
// specific

ffmpeg/ffmpeg.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,36 @@ func isAudioAllDrop(ps []TranscodeOptions) bool {
607607
return true
608608
}
609609

610+
// load input file into input buffer
611+
func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) {
612+
if strings.HasPrefix(strings.ToLower(input.Fname), "pipe:") {
613+
C.lpms_transcode_push_reset(t.handle, 0)
614+
fmt.Println("Skipping buffer input for pipe")
615+
return
616+
}
617+
C.lpms_transcode_push_reset(t.handle, 1)
618+
data, err := os.ReadFile(input.Fname)
619+
if nil != err {
620+
// error loading file
621+
// translate Go error into proper FFmpeg error to get expected behavior
622+
// of tests
623+
var error C.int
624+
if errors.Is(err, os.ErrNotExist) {
625+
error = 1
626+
} else {
627+
error = 0
628+
}
629+
C.lpms_transcode_push_error(t.handle, error);
630+
fmt.Println("Error while loading the queue", err)
631+
} else {
632+
// file loaded fine
633+
C.lpms_transcode_push_bytes(t.handle, (*C.uchar)(unsafe.Pointer(&data[0])), C.int(len(data)))
634+
C.lpms_transcode_push_eof(t.handle)
635+
fmt.Println(len(data), "bytes of input loaded into buffer")
636+
}
637+
}
638+
639+
610640
// create C output params array and return it along with corresponding finalizer
611641
// function that makes sure there are no C memory leaks
612642
func createCOutputParams(input *TranscodeOptionsIn, ps []TranscodeOptions) ([]C.output_params, func(), error) {
@@ -929,6 +959,7 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
929959
paramsPointer = (*C.output_params)(&params[0])
930960
resultsPointer = (*C.output_results)(&results[0])
931961
}
962+
loadInputBuffer(t, input)
932963
ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded))
933964
if ret != 0 {
934965
if LogTranscodeErrors {

ffmpeg/stream_buffer.c

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#include "stream_buffer.h"
2+
3+
int buffer_create(StreamBuffer *buffer)
4+
{
5+
buffer->data = (uint8_t *)malloc(STREAM_BUFFER_BYTES);
6+
if (!buffer->data) return -1;
7+
pthread_mutex_init(&buffer->mutex, NULL);
8+
pthread_cond_init(&buffer->condition_put, NULL);
9+
pthread_cond_init(&buffer->condition_get, NULL);
10+
buffer_reset(buffer);
11+
return 0;
12+
}
13+
14+
void buffer_destroy(StreamBuffer *buffer)
15+
{
16+
if (buffer->data) free(buffer->data);
17+
pthread_mutex_destroy(&buffer->mutex);
18+
pthread_cond_destroy(&buffer->condition_put);
19+
pthread_cond_destroy(&buffer->condition_get);
20+
buffer->data = NULL;
21+
buffer->flags = 0;
22+
}
23+
24+
static int64_t remaining(StreamBuffer *buffer)
25+
{
26+
return STREAM_BUFFER_BYTES - buffer->unread_bytes - PROTECTED_BYTES;
27+
}
28+
29+
static int ffmpeg_error(StreamErrors errors)
30+
{
31+
switch (errors) {
32+
case NO_ENTRY: return AVERROR(ENOENT);
33+
default: return AVERROR(EIO);
34+
}
35+
}
36+
37+
static int buffer_read_function(void *user_data, uint8_t *buf, int buf_size)
38+
{
39+
StreamBuffer *buffer = (StreamBuffer *)user_data;
40+
int64_t to_read, end_offset, trailing, first_copy, second_copy;
41+
int ret = 0;
42+
pthread_mutex_lock(&buffer->mutex);
43+
if (buffer->flags & STREAM_ERROR) {
44+
// there was an error, emulate FFmpeg behavior
45+
ret = ffmpeg_error(buffer->error);
46+
goto read_error;
47+
}
48+
// wait for end of stream or some unread data
49+
while (!(END_OF_STREAM & buffer->flags) && !buffer->unread_bytes) {
50+
pthread_cond_wait(&buffer->condition_put, &buffer->mutex);
51+
}
52+
if ((END_OF_STREAM & buffer->flags) && !buffer->unread_bytes) {
53+
// no unread data and none is coming, that is an EOF
54+
ret = AVERROR_EOF;
55+
goto read_error;
56+
}
57+
// have some data to read, copy them out
58+
to_read = (buf_size <= buffer->unread_bytes) ? buf_size : buffer->unread_bytes;
59+
end_offset = (buffer->index + buffer->read_bytes) % STREAM_BUFFER_BYTES;
60+
trailing = STREAM_BUFFER_BYTES - end_offset;
61+
first_copy = (trailing >= to_read) ? to_read : trailing;
62+
memcpy(buf, buffer->data + end_offset, first_copy);
63+
second_copy = to_read - first_copy;
64+
memcpy(buf + first_copy, buffer->data, second_copy);
65+
// update buffer
66+
buffer->read_bytes += to_read;
67+
buffer->unread_bytes -= to_read;
68+
pthread_mutex_unlock(&buffer->mutex);
69+
pthread_cond_signal(&buffer->condition_get);
70+
return to_read;
71+
72+
read_error:
73+
pthread_mutex_unlock(&buffer->mutex);
74+
return ret;
75+
}
76+
77+
static int64_t seek_to(StreamBuffer *buffer, int64_t pos)
78+
{
79+
int64_t available = buffer->read_bytes + buffer->unread_bytes;
80+
int64_t delta = pos - buffer->index;
81+
if (delta < 0) {
82+
// attempt to seek before the start of the buffer
83+
return -1;
84+
}
85+
if (available < delta) {
86+
// attempt to seek after the end of the buffer
87+
return -1;
88+
}
89+
90+
// execute seek
91+
buffer->read_bytes = delta;
92+
buffer->unread_bytes = available - delta;
93+
return pos;
94+
}
95+
96+
static int64_t buffer_seek_function(void *user_data, int64_t pos, int whence)
97+
{
98+
StreamBuffer *buffer = (StreamBuffer *)user_data;
99+
// remove force flag
100+
whence &= ~AVSEEK_FORCE;
101+
pthread_mutex_lock(&buffer->mutex);
102+
int ret;
103+
if (buffer->flags & STREAM_ERROR) {
104+
// there was an error, emulate FFmpeg behavior
105+
ret = ffmpeg_error(buffer->error);
106+
goto seek_finish;
107+
}
108+
// FFmpeg ORs in some extra flags, so have to use AND
109+
if (AVSEEK_SIZE & whence) {
110+
if (buffer->flags & END_OF_STREAM) {
111+
// already have all the data so can say
112+
ret = buffer->index + buffer->read_bytes + buffer->unread_bytes;
113+
goto seek_finish;
114+
} else {
115+
// not supported, because we cannot be sure how many bytes will arrive over
116+
// queue
117+
ret = -1;
118+
}
119+
goto seek_finish;
120+
}
121+
if (SEEK_END == whence) {
122+
if (END_OF_STREAM & buffer->flags) {
123+
// possible, because we reached end of stream already
124+
ret = seek_to(buffer, buffer->index + buffer->read_bytes + buffer->unread_bytes + pos);
125+
} else {
126+
// can't do that because we haven't seen the end yet
127+
ret = -1;
128+
}
129+
} else if (SEEK_SET == whence) {
130+
ret = seek_to(buffer, pos);
131+
} else if (SEEK_CUR == whence) {
132+
ret = seek_to(buffer, buffer->index + pos);
133+
}
134+
135+
seek_finish:
136+
pthread_mutex_unlock(&buffer->mutex);
137+
pthread_cond_signal(&buffer->condition_get);
138+
return ret;
139+
}
140+
141+
int buffer_setup_as_input(StreamBuffer *buffer, AVFormatContext *ctx)
142+
{
143+
// IMPORTANT: I am not sure if ffmpeg documentation states that explicitly,
144+
// but the memory of ctx->pb as well as its io_buffer seem to be released when
145+
// ctx will get closed. I tried otherwise and got "double free" errors
146+
#define BUFFER_SIZE 4096
147+
void *io_buffer = av_malloc(BUFFER_SIZE);
148+
if (!io_buffer) return -1;
149+
ctx->pb = avio_alloc_context(
150+
io_buffer, BUFFER_SIZE, // buffer and size
151+
0, // do not write, just read
152+
buffer, // pass buffer as user data
153+
buffer_read_function,
154+
NULL, // no write function supplied
155+
buffer_seek_function);
156+
if (!ctx->pb) return -1;
157+
ctx->flags |= AVFMT_FLAG_CUSTOM_IO;
158+
return 0;
159+
}
160+
161+
void buffer_reset(StreamBuffer *buffer)
162+
{
163+
buffer->index = buffer->read_bytes = buffer->unread_bytes = 0;
164+
buffer->flags = 0;
165+
buffer->error = 0;
166+
}
167+
168+
void buffer_put_bytes(StreamBuffer *buffer, uint8_t *bytes, int64_t size)
169+
{
170+
int64_t space, end, end_offset, trailing, first_copy, second_copy, deficit;
171+
pthread_mutex_lock(&buffer->mutex);
172+
while (!remaining(buffer)) {
173+
// wait until there is some free(able) space in the buffer
174+
pthread_cond_wait(&buffer->condition_get, &buffer->mutex);
175+
}
176+
// now see how much we can write
177+
space = remaining(buffer);
178+
if (space < size) size = space;
179+
// here we know that we can write
180+
end = buffer->index + buffer->read_bytes + buffer->unread_bytes;
181+
end_offset = end % STREAM_BUFFER_BYTES;
182+
// be careful to wrap around write
183+
trailing = STREAM_BUFFER_BYTES - end_offset;
184+
first_copy = (size <= trailing) ? size : trailing;
185+
memcpy(buffer->data + end_offset, bytes, first_copy);
186+
second_copy = size - first_copy;
187+
memcpy(buffer->data, bytes + first_copy, second_copy);
188+
// unread bytes changes obviously
189+
buffer->unread_bytes += size;
190+
// see if we should move index and change read_bytes
191+
deficit = STREAM_BUFFER_BYTES - buffer->read_bytes - buffer->unread_bytes;
192+
if (deficit < 0) {
193+
// yeah
194+
buffer->index -= deficit;
195+
buffer->read_bytes += deficit;
196+
}
197+
pthread_mutex_unlock(&buffer->mutex);
198+
// signal reader that it can proceed
199+
pthread_cond_signal(&buffer->condition_put);
200+
}
201+
202+
void buffer_end_of_stream(StreamBuffer *buffer)
203+
{
204+
pthread_mutex_lock(&buffer->mutex);
205+
buffer->flags = END_OF_STREAM;
206+
pthread_mutex_unlock(&buffer->mutex);
207+
pthread_cond_signal(&buffer->condition_put);
208+
}
209+
210+
void buffer_error(StreamBuffer *buffer, StreamErrors error)
211+
{
212+
pthread_mutex_lock(&buffer->mutex);
213+
// set flags to both error and end of stream to get out of any waiting loop
214+
buffer->flags = STREAM_ERROR | END_OF_STREAM;
215+
buffer->error = error;
216+
pthread_mutex_unlock(&buffer->mutex);
217+
pthread_cond_signal(&buffer->condition_put);
218+
}
219+

0 commit comments

Comments
 (0)