Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
634 changes: 634 additions & 0 deletions docs/HEARTBEAT_SYSTEM_README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion exe-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pyinstaller==6.12.0
pyinstaller==6.17.0
staticx @ git+https://github.com/Granulate/staticx.git@33eefdadc72832d5aa67c0792768c9e76afb746d; platform.machine == "x86_64"
606 changes: 606 additions & 0 deletions gprofiler/heartbeat.py

Large diffs are not rendered by default.

161 changes: 130 additions & 31 deletions gprofiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from granulate_utils.linux.ns import is_root, is_running_in_init_pid
from granulate_utils.linux.process import is_process_running
from granulate_utils.metadata.cloud import get_aws_execution_env
from psutil import NoSuchProcess, Process
from psutil import NoSuchProcess, Process, process_iter
from requests import RequestException, Timeout

from gprofiler import __version__
Expand All @@ -48,6 +48,7 @@
from gprofiler.diagnostics import log_diagnostics, set_diagnostics
from gprofiler.exceptions import APIError, NoProfilersEnabledError
from gprofiler.gprofiler_types import ProcessToProfileData, UserArgs, integers_list, positive_integer
from gprofiler.heartbeat import DynamicGProfilerManager, HeartbeatClient
from gprofiler.hw_metrics import HWMetricsMonitor, HWMetricsMonitorBase, NoopHWMetricsMonitor
from gprofiler.log import RemoteLogsHandler, initial_root_logger_setup
from gprofiler.merge import concatenate_from_external_file, concatenate_profiles, merge_profiles
Expand Down Expand Up @@ -167,6 +168,8 @@ def __init__(
profiling_mode=profiling_mode,
container_names_client=container_names_client,
processes_to_profile=processes_to_profile,
max_processes_per_profiler=user_args.get("max_processes_per_profiler", 0),
max_system_processes_for_system_profilers=user_args.get("max_system_processes_for_system_profilers", 0),
)
self.system_profiler, self.process_profilers = get_profilers(user_args, profiler_state=self._profiler_state)
self._usage_logger = usage_logger
Expand Down Expand Up @@ -279,8 +282,37 @@ def start(self) -> None:
self._system_metrics_monitor.start()
self._hw_metrics_monitor.start()

# Check if system should skip continuous profilers due to process count
skip_system_profilers = False
if self._profiler_state.max_system_processes_for_system_profilers > 0:
try:
total_processes = len(list(process_iter()))
if total_processes > self._profiler_state.max_system_processes_for_system_profilers:
skip_system_profilers = True
logger.warning(
f"Skipping system profilers (perf) - {total_processes} processes exceed threshold "
f"of {self._profiler_state.max_system_processes_for_system_profilers}. "
f"Runtime profilers (py-spy, Java, etc.) will continue normally."
)
else:
logger.debug(
f"System process count: {total_processes} "
f"(threshold: {self._profiler_state.max_system_processes_for_system_profilers})"
)
except Exception as e:
logger.warning(f"Could not count system processes, continuing with all profilers: {e}")

for prof in list(self.all_profilers):
try:
# Skip system profilers if threshold exceeded
if (
skip_system_profilers
and hasattr(prof, "_is_system_wide_profiler")
and prof._is_system_wide_profiler()
):
logger.info(f"Skipping {prof.__class__.__name__} due to high system process count")
continue

prof.start()
except Exception:
# the SystemProfiler is handled separately - let the user run with '--perf-mode none' if they
Expand Down Expand Up @@ -594,6 +626,25 @@ def parse_cmd_args() -> configargparse.Namespace:
help="Comma separated list of processes that will be filtered to profile,"
" given multiple times will append pids to one list",
)
parser.add_argument(
"--max-processes-runtime-profiler",
dest="max_processes_per_profiler",
type=positive_integer,
default=0,
help="Maximum number of processes to profile per runtime profiler (0=unlimited). "
"When exceeded, profiles only the top N processes by CPU usage. "
"Does not affect system-wide profilers (perf, eBPF). Default: %(default)s",
)
parser.add_argument(
"--skip-system-profilers-above",
dest="max_system_processes_for_system_profilers",
type=positive_integer,
default=0,
help="Skip system-wide profilers (perf only) when total system processes exceed this threshold (0=unlimited). "
"When exceeded, prevents perf profiler from starting to reduce resource usage on busy systems. "
"PyPerf has its own threshold via --python-skip-pyperf-profiler-above. "
"Runtime profilers (py-spy, Java, etc.) continue normally with --max-processes limiting. Default: %(default)s",
)
parser.add_argument(
"--rootless",
action="store_true",
Expand Down Expand Up @@ -861,6 +912,22 @@ def parse_cmd_args() -> configargparse.Namespace:
"The file modification indicates the last snapshot time.",
)

parser.add_argument(
"--enable-heartbeat-server",
action="store_true",
dest="enable_heartbeat_server",
default=False,
help="Enable heartbeat communication with server for dynamic profiling commands",
)

parser.add_argument(
"--heartbeat-interval",
type=positive_integer,
dest="heartbeat_interval",
default=30,
help="Interval in seconds for sending heartbeats to server (default: %(default)s)",
)

if is_linux() and not is_aarch64():
hw_metrics_options = parser.add_argument_group("hardware metrics")
hw_metrics_options.add_argument(
Expand Down Expand Up @@ -936,6 +1003,14 @@ def parse_cmd_args() -> configargparse.Namespace:
if args.profile_spawned_processes and args.pids_to_profile is not None:
parser.error("--pids is not allowed when profiling spawned processes")

if args.enable_heartbeat_server:
if not args.upload_results:
parser.error("--enable-heartbeat-server requires --upload-results to be enabled")
if not args.server_token:
parser.error("--enable-heartbeat-server requires --token to be provided")
if not args.service_name:
parser.error("--enable-heartbeat-server requires --service-name to be provided")

return args


Expand Down Expand Up @@ -1215,37 +1290,61 @@ def main() -> None:

ApplicationIdentifiers.init(enrichment_options)
set_diagnostics(args.diagnostics)
gprofiler = GProfiler(
output_dir=args.output_dir,
flamegraph=args.flamegraph,
rotating_output=args.rotating_output,
rootless=args.rootless,
profiler_api_client=profiler_api_client,
collect_metrics=args.collect_metrics,
collect_metadata=args.collect_metadata,
enrichment_options=enrichment_options,
state=state,
usage_logger=usage_logger,
user_args=args.__dict__,
duration=args.duration,
profile_api_version=args.profile_api_version,
profiling_mode=args.profiling_mode,
collect_hw_metrics=getattr(args, "collect_hw_metrics", False),
profile_spawned_processes=args.profile_spawned_processes,
remote_logs_handler=remote_logs_handler,
controller_process=controller_process,
processes_to_profile=processes_to_profile,
external_metadata_path=external_metadata_path,
heartbeat_file_path=heartbeat_file_path,
perfspect_path=perfspect_path,
perfspect_duration=getattr(args, "tool_perfspect_duration", 60),
verbose=args.verbose,
)
logger.info("gProfiler initialized and ready to start profiling")
if args.continuous:
gprofiler.run_continuous()

# Check if heartbeat server mode is enabled FIRST
if args.enable_heartbeat_server:
# Create heartbeat client
heartbeat_client = HeartbeatClient(
api_server=args.api_server,
service_name=args.service_name,
server_token=args.server_token,
verify=args.verify,
)

# Create dynamic profiler manager
manager = DynamicGProfilerManager(args, heartbeat_client)
manager.heartbeat_interval = args.heartbeat_interval

try:
logger.info("Starting heartbeat mode - waiting for server commands...")
manager.start_heartbeat_loop()
except KeyboardInterrupt:
logger.info("Received interrupt signal, stopping heartbeat mode...")
finally:
manager.stop()
else:
gprofiler.run_single()
# Normal profiling mode
gprofiler = GProfiler(
output_dir=args.output_dir,
flamegraph=args.flamegraph,
rotating_output=args.rotating_output,
rootless=args.rootless,
profiler_api_client=profiler_api_client,
collect_metrics=args.collect_metrics,
collect_metadata=args.collect_metadata,
enrichment_options=enrichment_options,
state=state,
usage_logger=usage_logger,
user_args=args.__dict__,
duration=args.duration,
profile_api_version=args.profile_api_version,
profiling_mode=args.profiling_mode,
collect_hw_metrics=getattr(args, "collect_hw_metrics", False),
profile_spawned_processes=args.profile_spawned_processes,
remote_logs_handler=remote_logs_handler,
controller_process=controller_process,
processes_to_profile=processes_to_profile,
external_metadata_path=external_metadata_path,
heartbeat_file_path=heartbeat_file_path,
perfspect_path=perfspect_path,
perfspect_duration=getattr(args, "tool_perfspect_duration", 60),
verbose=args.verbose,
)
logger.info("gProfiler initialized and ready to start profiling")
if args.continuous:
gprofiler.run_continuous()
else:
gprofiler.run_single()

except KeyboardInterrupt:
pass
Expand Down
2 changes: 2 additions & 0 deletions gprofiler/profiler_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class ProfilerState:
profiling_mode: str
container_names_client: Optional[ContainerNamesClient]
processes_to_profile: Optional[List[Process]]
max_processes_per_profiler: int
max_system_processes_for_system_profilers: int

def __post_init__(self) -> None:
self._temporary_dir = TemporaryDirectoryWithMode(dir=self.storage_dir, mode=0o755)
Expand Down
43 changes: 43 additions & 0 deletions gprofiler/profilers/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ def add_highest_avg_depth_stacks_per_process(
action="store_false",
dest="perf_memory_restart",
),
ProfilerArgument(
"--perf-use-cgroups",
help="Use cgroup-based profiling instead of PID-based profiling for better reliability. "
"Profiles the top N cgroups by resource usage, avoiding crashes from invalid PIDs.",
action="store_true",
default=False,
dest="perf_use_cgroups",
),
ProfilerArgument(
"--perf-max-cgroups",
help="Maximum number of cgroups to profile when using --perf-use-cgroups. Default: %(default)s",
type=int,
default=50,
dest="perf_max_cgroups",
),
ProfilerArgument(
"--perf-max-docker-containers",
help="Maximum number of individual Docker containers to profile instead of the broad 'docker' cgroup. "
"When set, profiles the top N highest-resource individual containers rather than all containers together. "
"Set to 0 to use the broad 'docker' cgroup (default behavior). Default: %(default)s",
type=int,
default=0,
dest="perf_max_docker_containers",
),
],
disablement_help="Disable the global perf of processes,"
" and instead only concatenate runtime-specific profilers results",
Expand All @@ -138,6 +162,10 @@ class SystemProfiler(ProfilerBase):
versions of Go processes.
"""

def _is_system_wide_profiler(self) -> bool:
"""Perf is a system-wide profiler that can be disabled on busy systems."""
return True

def __init__(
self,
frequency: int,
Expand All @@ -148,6 +176,9 @@ def __init__(
perf_inject: bool,
perf_node_attach: bool,
perf_memory_restart: bool,
perf_use_cgroups: bool = False,
perf_max_cgroups: int = 50,
perf_max_docker_containers: int = 0,
min_duration: int = 0,
):
super().__init__(frequency, duration, profiler_state, min_duration)
Expand All @@ -159,6 +190,12 @@ def __init__(
self._node_processes: List[Process] = []
self._node_processes_attached: List[Process] = []
self._perf_memory_restart = perf_memory_restart
self._perf_mode = perf_mode
self._perf_dwarf_stack_size = perf_dwarf_stack_size
self._perf_inject = perf_inject
self._perf_use_cgroups = perf_use_cgroups
self._perf_max_cgroups = perf_max_cgroups
self._perf_max_docker_containers = perf_max_docker_containers
switch_timeout_s = duration * 3 # allow gprofiler to be delayed up to 3 intervals before timing out.
extra_args = []
try:
Expand All @@ -184,6 +221,9 @@ def __init__(
extra_args=extra_args,
processes_to_profile=self._profiler_state.processes_to_profile,
switch_timeout_s=switch_timeout_s,
use_cgroups=self._perf_use_cgroups,
max_cgroups=self._perf_max_cgroups,
max_docker_containers=self._perf_max_docker_containers,
)
self._perfs.append(self._perf_fp)
else:
Expand All @@ -200,6 +240,9 @@ def __init__(
extra_args=extra_args,
processes_to_profile=self._profiler_state.processes_to_profile,
switch_timeout_s=switch_timeout_s,
use_cgroups=self._perf_use_cgroups,
max_cgroups=self._perf_max_cgroups,
max_docker_containers=self._perf_max_docker_containers,
)
self._perfs.append(self._perf_dwarf)
else:
Expand Down
Loading
Loading