Skip to content

Commit 8cbca9b

Browse files
committed
feat: add a GStreamer runner to launch pipelines
Add a new GStreamer runner to launch pipelines that gives use more control in errors and allow us to differentiate for example between a pipelines miss-configuration or a format that's not supported.
1 parent c892bbc commit 8cbca9b

File tree

4 files changed

+1091
-20
lines changed

4 files changed

+1091
-20
lines changed

fluster/decoders/gstreamer.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,20 @@
1818

1919

2020
import os
21-
import shlex
2221
import subprocess
2322
from functools import lru_cache
2423
from typing import List, Optional
2524

2625
from fluster.codec import Codec, OutputFormat
2726
from fluster.decoder import Decoder, register_decoder
27+
from fluster.gstreamer import run_pipeline
2828
from fluster.utils import (
2929
file_checksum,
3030
normalize_binary_cmd,
31-
run_command,
32-
run_command_with_output,
3331
)
3432

35-
PIPELINE_TPL = "{} --no-fault filesrc location={} ! {} ! {} ! {} ! {} {}"
36-
PIPELINE_TPL_FLU_H266_DEC = "{} --no-fault filesrc location={} ! {} ! {} ! {} {}"
33+
PIPELINE_TPL = "filesrc location={} ! {} ! {} ! {} ! {} {}"
34+
PIPELINE_TPL_FLU_H266_DEC = "filesrc location={} ! {} ! {} ! {} {}"
3735

3836

3937
@lru_cache(maxsize=None)
@@ -86,7 +84,6 @@ class GStreamer(Decoder):
8684
"""Base class for GStreamer decoders"""
8785

8886
decoder_bin = ""
89-
cmd = ""
9087
caps = ""
9188
gst_api = ""
9289
api = ""
@@ -99,7 +96,6 @@ def __init__(self) -> None:
9996
if not self.name:
10097
self.name = f"{self.provider}-{self.codec.value}-{self.api}-Gst{self.gst_api}"
10198
self.description = f"{self.provider} {self.codec.value} {self.api} decoder for GStreamer {self.gst_api}"
102-
self.cmd = normalize_binary_cmd(self.cmd)
10399

104100
if not gst_element_exists(self.sink):
105101
self.sink = "filesink"
@@ -113,7 +109,6 @@ def gen_pipeline(
113109
"""Generate the GStreamer pipeline used to decode the test vector"""
114110
output = f"location={output_filepath}" if output_filepath else ""
115111
return PIPELINE_TPL.format(
116-
self.cmd,
117112
input_filepath,
118113
self.parser if self.parser else "parsebin",
119114
self.decoder_bin,
@@ -157,31 +152,32 @@ def decode(
157152
if self.sink == "videocodectestsink":
158153
output_param = output_filepath if keep_files else None
159154
pipeline = self.gen_pipeline(input_filepath, output_param, output_format)
160-
command = shlex.split(pipeline)
161-
command.append("-m")
162-
data = run_command_with_output(command, timeout=timeout, verbose=verbose).splitlines()
155+
result = run_pipeline(pipeline, timeout=timeout, verbose=verbose, print_messages=True)
156+
if result.returncode != 0:
157+
raise subprocess.CalledProcessError(result.returncode, pipeline, result.stdout, result.stderr)
158+
data = result.stdout.splitlines()
163159
return self.parse_videocodectestsink_md5sum(data)
164160

165161
pipeline = self.gen_pipeline(input_filepath, output_filepath, output_format)
166-
run_command(shlex.split(pipeline), timeout=timeout, verbose=verbose)
162+
result = run_pipeline(pipeline, timeout=timeout, verbose=verbose)
163+
if result.returncode != 0:
164+
raise subprocess.CalledProcessError(result.returncode, pipeline, result.stdout, result.stderr)
167165
return file_checksum(output_filepath)
168166

169167
@lru_cache(maxsize=128)
170168
def check(self, verbose: bool) -> bool:
171169
"""Check if GStreamer decoder is valid (better than gst-inspect)"""
172170
try:
173-
binary = normalize_binary_cmd(f"gst-launch-{self.gst_api}")
174-
pipeline = f"{binary} --no-fault appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink"
175-
run_command(shlex.split(pipeline), verbose=verbose)
171+
pipeline = f"appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink"
172+
result = run_pipeline(pipeline, verbose=verbose)
173+
return result.returncode == 0
176174
except Exception:
177175
return False
178-
return True
179176

180177

181178
class GStreamer10Video(GStreamer):
182179
"""Base class for GStreamer 1.x video decoders"""
183180

184-
cmd = "gst-launch-1.0"
185181
caps = "video/x-raw"
186182
gst_api = "1.0"
187183
sink = "videocodectestsink"
@@ -205,7 +201,6 @@ def gen_pipeline(
205201
caps = f"{self.caps} ! videoconvert dither=none ! {raw_caps}"
206202
output = f"location={output_filepath}" if output_filepath else ""
207203
return PIPELINE_TPL.format(
208-
self.cmd,
209204
input_filepath,
210205
self.parser if self.parser else "parsebin",
211206
self.decoder_bin,
@@ -218,7 +213,6 @@ def gen_pipeline(
218213
class GStreamer10Audio(GStreamer):
219214
"""Base class for GStreamer 1.x audio decoders"""
220215

221-
cmd = "gst-launch-1.0"
222216
caps = "audio/x-raw"
223217
gst_api = "1.0"
224218
sink = "filesink"
@@ -789,7 +783,6 @@ def gen_pipeline(
789783
caps = f"{self.caps} ! videoconvert dither=none ! video/x-raw,format={output_format_to_gst(output_format)}"
790784
output = f"location={output_filepath}" if output_filepath else ""
791785
return PIPELINE_TPL_FLU_H266_DEC.format(
792-
self.cmd,
793786
input_filepath,
794787
self.decoder_bin,
795788
caps,

fluster/gstreamer/__init__.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Fluster - testing framework for decoders conformance
2+
# Copyright (C) 2025, Fluendo, S.A.
3+
# Author: Andoni Morales Alastruey <amorales@fluendo.com>, Fluendo, S.A.
4+
#
5+
# This library is free software; you can redistribute it and/or
6+
# modify it under the terms of the GNU Lesser General Public License
7+
# as published by the Free Software Foundation, either version 3
8+
# of the License, or (at your option) any later version.
9+
#
10+
# This library is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
# Lesser General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU Lesser General Public
16+
# License along with this library. If not, see <https://www.gnu.org/licenses/>.
17+
18+
"""
19+
GStreamer utilities for Fluster.
20+
21+
This package provides CFFI bindings for GStreamer and a pipeline runner
22+
that can be used to run GStreamer pipelines without depending on the
23+
GStreamer Python bindings (gi.repository.Gst).
24+
"""
25+
26+
import os
27+
import subprocess
28+
import sys
29+
from typing import Optional
30+
31+
from fluster.gstreamer.gst_cffi import GStreamerInstallation
32+
from fluster.gstreamer.runner import ExitCode as ExitCode
33+
34+
35+
def run_pipeline(
36+
pipeline: str,
37+
timeout: Optional[int] = None,
38+
verbose: bool = False,
39+
quiet: bool = False,
40+
print_messages: bool = False,
41+
) -> subprocess.CompletedProcess:
42+
"""
43+
Run a GStreamer pipeline in a subprocess with proper environment setup.
44+
45+
This is a convenience function that handles environment configuration and
46+
spawns the GStreamer runner as a subprocess. It's the recommended way to
47+
run GStreamer pipelines from fluster.
48+
49+
Args:
50+
pipeline: The GStreamer pipeline description string (gst-launch format).
51+
timeout: Timeout in seconds for the pipeline to complete. None for no timeout.
52+
verbose: Enable verbose output from the runner.
53+
quiet: Suppress output except errors.
54+
print_messages: Print all bus messages (like gst-launch -m).
55+
56+
Returns:
57+
subprocess.CompletedProcess with returncode, stdout, and stderr.
58+
59+
Exit codes (see ExitCode enum):
60+
SUCCESS (0) - Pipeline completed successfully (EOS)
61+
ERROR (1) - Pipeline error occurred
62+
INIT_ERROR (2) - Invalid arguments or initialization error
63+
TIMEOUT (3) - Timeout occurred
64+
"""
65+
cmd = [sys.executable, "-m", "fluster.gstreamer.runner"]
66+
if verbose:
67+
cmd.append("--verbose")
68+
if quiet:
69+
cmd.append("--quiet")
70+
if print_messages:
71+
cmd.append("--messages")
72+
if timeout is not None:
73+
cmd.extend(["--timeout", str(timeout)])
74+
cmd.append(pipeline)
75+
env = os.environ.copy()
76+
env.update(GStreamerInstallation().get_environment())
77+
return subprocess.run(cmd, env=env, capture_output=True, text=True, check=False)

0 commit comments

Comments
 (0)