From 36224e639779504fb59af9e9a96ddcd0281cdcb6 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Mon, 20 Mar 2023 23:50:18 +0100 Subject: [PATCH 01/11] Add continuous collection of profiles in java profiler --- gprofiler/profilers/java.py | 137 ++++++++++++++++++++++++++++++++---- tests/test_java.py | 16 +++-- 2 files changed, 134 insertions(+), 19 deletions(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 24e17e786..a599e2af3 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -8,7 +8,8 @@ import re import secrets import signal -from enum import Enum +from dataclasses import dataclass +from enum import Enum, auto from pathlib import Path from subprocess import CompletedProcess from threading import Event, Lock @@ -409,6 +410,22 @@ def get_ap_version() -> str: r"KB\n\n" ) +# Status reported by async profiler when it's started. Defined here: +# https://github.com/async-profiler/async-profiler/blob/e3b7bfca227ae5c916f00abfacf0e61291df3a67/src/profiler.cpp#L1541 +_AP_STATUS_RUNNING_RE = re.compile(r"Profiling is running for (?P\d+) seconds\n") + + +class AsyncProfilerStartStatus(Enum): + STARTED_BY_US = auto() + ALREADY_RUNNING = auto() + + +@dataclass(frozen=True) +class AsyncProfilerStatus: + message: Optional[str] + running: bool + uptime: int = 0 + class AsyncProfiledProcess: """ @@ -484,6 +501,8 @@ def __init__( self._mcache = mcache self._collect_meminfo = collect_meminfo self._include_method_modifiers = ",includemm" if include_method_modifiers else "" + self._started_by_us = False + self._setup_state: Optional[bool] = None def _find_rw_exec_dir(self, available_dirs: Sequence[str]) -> str: """ @@ -498,6 +517,12 @@ def _find_rw_exec_dir(self, available_dirs: Sequence[str]) -> str: raise NoRwExecDirectoryFoundError(f"Could not find a rw & exec directory out of {available_dirs}!") def __enter__(self: T) -> T: + self.setup() + return self + + def setup(self) -> None: + # assert java_mode == "ap", "Java profiler should not be initialized, wrong java_mode value given" + assert self._setup_state is None, "AsyncProfilerProcess must not be setup twice" os.makedirs(self._ap_dir_host, 0o755, exist_ok=True) os.makedirs(self._storage_dir_host, 0o755, exist_ok=True) @@ -509,8 +534,7 @@ def __enter__(self: T) -> T: self._recreate_log() # copy libasyncProfiler.so if needed self._copy_libap() - - return self + self._setup_state = True def __exit__( self, @@ -518,11 +542,16 @@ def __exit__( exc_val: Optional[BaseException], exc_ctb: Optional[TracebackType], ) -> None: + self.teardown() + + def teardown(self) -> None: + assert self._setup_state is True, "AsyncProfilerProcess must not run teardown twice" # ignore_errors because we are deleting paths via /proc/pid/root - and the pid # we're using might have gone down already. # remove them as best effort. remove_path(self._output_path_host, missing_ok=True) remove_path(self._log_path_host, missing_ok=True) + self._setup_state = False def _existing_realpath(self, path: str) -> Optional[str]: """ @@ -620,6 +649,19 @@ def _get_stop_cmd(self, with_output: bool) -> List[str]: f"{self._get_extra_ap_args()}" ] + def _get_dump_cmd(self, ap_timeout: int) -> List[str]: + return self._get_base_cmd() + [ + f"dump" + f",log={self._log_path_process}" + f"{self._get_ap_output_args()}" + f",timeout={ap_timeout}" + f"{',lib' if self._profiler_state.insert_dso_name else ''}{',meminfolog' if self._collect_meminfo else ''}" + f"{self._get_extra_ap_args()}" + ] + + def _get_status_cmd(self) -> List[str]: + return self._get_base_cmd() + ["status" f"{self._get_ap_output_args()}"] + def _read_ap_log(self) -> str: if not os.path.exists(self._log_path_host): return "(log file doesn't exist)" @@ -678,9 +720,11 @@ def _run_fdtransfer(self) -> None: timeout=self._FDTRANSFER_TIMEOUT, ) - def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeout: int = 0) -> bool: + def start_async_profiler( + self, interval: int, second_try: bool = False, ap_timeout: int = 0 + ) -> AsyncProfilerStartStatus: """ - Returns True if profiling was started; False if it was already started. + Returns STARTED_BY_US if profiling was started; ALREADY_RUNNING if it was already started. ap_timeout defaults to 0, which means "no timeout" for AP (see call to startTimer() in profiler.cpp) """ if self._mode == "cpu" and not second_try: @@ -689,7 +733,9 @@ def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeo start_cmd = self._get_start_cmd(interval, ap_timeout) try: self._run_async_profiler(start_cmd) - return True + if not self._started_by_us: + self._started_by_us = True + return AsyncProfilerStartStatus.STARTED_BY_US except JattachException as e: if e.is_ap_loaded: if ( @@ -697,12 +743,25 @@ def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeo and e.get_ap_log() == "[ERROR] Profiler already started\n" ): # profiler was already running - return False + return AsyncProfilerStartStatus.ALREADY_RUNNING raise def stop_async_profiler(self, with_output: bool) -> str: return self._run_async_profiler(self._get_stop_cmd(with_output)) + def dump_from_async_profiler(self, ap_timeout: int = 0) -> str: + return self._run_async_profiler(self._get_dump_cmd(ap_timeout)) + + def get_async_profiler_status(self) -> AsyncProfilerStatus: + status_cmd = self._get_status_cmd() + self._run_async_profiler(status_cmd) + message = self.read_output() + if message is not None: + m = _AP_STATUS_RUNNING_RE.match(message) + if m is not None: + return AsyncProfilerStatus(message, True, int(m.group("uptime"))) + return AsyncProfilerStatus(message, False) + def read_output(self) -> Optional[str]: try: return Path(self._output_path_host).read_text() @@ -712,6 +771,9 @@ def read_output(self) -> Optional[str]: return None raise + def is_started_by_us(self) -> bool: + return self._started_by_us + @register_profiler( "Java", @@ -878,13 +940,17 @@ def __init__( self._jattach_jcmd_runner = JattachJcmdRunner( stop_event=self._profiler_state.stop_event, jattach_timeout=self._jattach_timeout ) - self._ap_timeout = self._duration + self._AP_EXTRA_TIMEOUT_S + # TODO: disable timeout until support for restart is added in async-profiler + # self._ap_timeout = self._duration + self._AP_EXTRA_TIMEOUT_S + self._ap_timeout = 0 application_identifiers.ApplicationIdentifiers.init_java(self._jattach_jcmd_runner) self._metadata = JavaMetadata( self._profiler_state.stop_event, self._jattach_jcmd_runner, self._collect_jvm_flags ) self._report_meminfo = java_async_profiler_report_meminfo self._include_method_modifiers = java_include_method_modifiers + # keep ap_process instances to enable continuous profiling + self._pid_to_ap_proc: Dict[int, Optional[AsyncProfiledProcess]] = {} def _init_ap_mode(self, profiling_mode: str, ap_mode: str) -> None: assert profiling_mode in ("cpu", "allocation"), "async-profiler support only cpu/allocation profiling modes" @@ -1050,6 +1116,26 @@ def _check_async_profiler_loaded(self, process: Process) -> bool: return False + def _get_ap_proc( + self, + process: Process, + profiler_state: ProfilerState, + mode: str, + ap_safemode: int, + ap_args: str, + jattach_timeout: int, + mcache: int, + collect_meminfo: bool, + ) -> AsyncProfiledProcess: + ap_proc: Optional[AsyncProfiledProcess] = self._pid_to_ap_proc.get(process.pid, None) + if ap_proc is None: + ap_proc = AsyncProfiledProcess( + process, profiler_state, mode, ap_safemode, ap_args, jattach_timeout, mcache, collect_meminfo + ) + ap_proc.setup() + self._pid_to_ap_proc[process.pid] = ap_proc + return ap_proc + def _profile_process(self, process: Process, duration: int, spawned: bool) -> ProfileData: comm = process_comm(process) exe = process_exe(process) @@ -1096,7 +1182,7 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr logger.debug("Process paths", pid=process.pid, execfn=execfn, exe=exe) logger.debug("Process mapped files", pid=process.pid, maps=set(m.path for m in process.memory_maps())) - with AsyncProfiledProcess( + ap_proc = self._get_ap_proc( process, self._profiler_state, self._mode, @@ -1106,8 +1192,8 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr self._ap_mcache, self._report_meminfo, self._include_method_modifiers, - ) as ap_proc: - stackcollapse = self._profile_ap_process(ap_proc, comm, duration) + ) + stackcollapse = self._profile_ap_process(ap_proc, comm, duration) return ProfileData(stackcollapse, appid, app_metadata, container_name) @@ -1128,9 +1214,13 @@ def _log_mem_usage(ap_log: str, pid: int) -> None: pid=pid, ) - def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: + def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfilerStartStatus: + # first time we need to start profiler ourselves + # afterwards we will check if it's still running by looking at its status + if ap_proc.is_started_by_us() and ap_proc.get_async_profiler_status().running: + return AsyncProfilerStartStatus.STARTED_BY_US started = ap_proc.start_async_profiler(self._interval, ap_timeout=self._ap_timeout) - if not started: + if started == AsyncProfilerStartStatus.ALREADY_RUNNING: logger.info(f"Found async-profiler already started on {ap_proc.process.pid}, trying to stop it...") # stop, and try to start again. this might happen if AP & gProfiler go out of sync: for example, # gProfiler being stopped brutally, while AP keeps running. If gProfiler is later started again, it will @@ -1139,10 +1229,14 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration # surely does. ap_proc.stop_async_profiler(with_output=False) started = ap_proc.start_async_profiler(self._interval, second_try=True, ap_timeout=self._ap_timeout) - if not started: + if started == AsyncProfilerStartStatus.ALREADY_RUNNING: raise Exception( f"async-profiler is still running in {ap_proc.process.pid}, even after trying to stop it!" ) + return AsyncProfilerStartStatus.STARTED_BY_US + + def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: + self._prepare_async_profiler(ap_proc) try: wait_event( @@ -1157,8 +1251,9 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration logger.debug(f"Profiled process {ap_proc.process.pid} exited before stopping async-profiler") # fall-through - try to read the output, since async-profiler writes it upon JVM exit. finally: + # don't stop profiler now; will do it in stop() if is_process_running(ap_proc.process): - ap_log = ap_proc.stop_async_profiler(True) + ap_log = ap_proc.dump_from_async_profiler(self._ap_timeout) if self._report_meminfo: self._log_mem_usage(ap_log, ap_proc.process.pid) @@ -1168,6 +1263,8 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration return self._profiling_error_stack("error", "process exited before reading the output", comm) else: logger.info(f"Finished profiling process {ap_proc.process.pid}") + # TODO: adjust output for cumulative frame count - async-profiler doesn't reset it + # TODO: account for dump skew - time between last dump output and the next call to snapshot() return parse_one_collapsed(output, comm) def _check_hotspot_error(self, ap_proc: AsyncProfiledProcess) -> None: @@ -1226,6 +1323,9 @@ def stop(self) -> None: if self._enabled_proc_events_java: proc_events.unregister_exit_callback(self._proc_exit_callback) self._enabled_proc_events_java = False + pids_to_remove = list(self._pid_to_ap_proc.keys()) + for pid in pids_to_remove: + self._stop_profiling_for_pid(pid) super().stop() def _proc_exit_callback(self, tid: int, pid: int, exit_code: int) -> None: @@ -1294,6 +1394,12 @@ def _handle_new_kernel_messages(self) -> None: else: self._handle_kernel_messages(messages) + def _stop_profiling_for_pid(self, pid: int) -> None: + ap_proc = self._pid_to_ap_proc.pop(pid, None) + if ap_proc is not None: + ap_proc.stop_async_profiler(False) + ap_proc.teardown() + def snapshot(self) -> ProcessToProfileData: try: return super().snapshot() @@ -1303,4 +1409,5 @@ def snapshot(self) -> ProcessToProfileData: self._want_to_profile_pids -= self._pids_to_remove for pid in self._pids_to_remove: self._pid_to_java_version.pop(pid, None) + self._stop_profiling_for_pid(pid) self._pids_to_remove.clear() diff --git a/tests/test_java.py b/tests/test_java.py index adb8a4aa0..51d52d477 100644 --- a/tests/test_java.py +++ b/tests/test_java.py @@ -32,6 +32,7 @@ from gprofiler.profilers.java import ( JAVA_SAFEMODE_ALL, AsyncProfiledProcess, + AsyncProfilerStartStatus, JavaFlagCollectionOptions, JavaProfiler, frequency_to_ap_interval, @@ -121,7 +122,7 @@ def test_async_profiler_already_running( ap_safemode=0, ap_args="", ) as ap_proc: - assert ap_proc.start_async_profiler(frequency_to_ap_interval(11)) + assert ap_proc.start_async_profiler(frequency_to_ap_interval(11)) == AsyncProfilerStartStatus.STARTED_BY_US assert any("libasyncProfiler.so" in m.path for m in process.memory_maps()) # run "status" with AsyncProfiledProcessForTests( @@ -249,7 +250,9 @@ def test_hotspot_error_file( start_async_profiler = AsyncProfiledProcess.start_async_profiler # Simulate crashing process - def start_async_profiler_and_crash(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> bool: + def start_async_profiler_and_crash( + self: AsyncProfiledProcess, *args: Any, **kwargs: Any + ) -> AsyncProfilerStartStatus: result = start_async_profiler(self, *args, **kwargs) self.process.send_signal(signal.SIGBUS) return result @@ -358,7 +361,10 @@ def test_async_profiler_stops_after_given_timeout( ap_safemode=0, ap_args="", ) as ap_proc: - assert ap_proc.start_async_profiler(frequency_to_ap_interval(11), ap_timeout=timeout_s) + assert ( + ap_proc.start_async_profiler(frequency_to_ap_interval(11), ap_timeout=timeout_s) + == AsyncProfilerStartStatus.STARTED_BY_US + ) ap_proc.status_async_profiler() assert "Profiling is running for " in cast_away_optional(ap_proc.read_output()) @@ -538,7 +544,9 @@ def test_java_appid_and_metadata_before_process_exits( start_async_profiler = AsyncProfiledProcess.start_async_profiler # Make the process exit before profiling ends - def start_async_profiler_and_interrupt(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> bool: + def start_async_profiler_and_interrupt( + self: AsyncProfiledProcess, *args: Any, **kwargs: Any + ) -> AsyncProfilerStartStatus: result = start_async_profiler(self, *args, **kwargs) time.sleep(3) self.process.send_signal(signal.SIGINT) From eca2e204edcfcbb38c8def779729dd0b04f64eac Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Tue, 21 Mar 2023 00:07:47 +0100 Subject: [PATCH 02/11] Fix lint --- gprofiler/profilers/java.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index a599e2af3..9a6e2424b 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -1126,11 +1126,20 @@ def _get_ap_proc( jattach_timeout: int, mcache: int, collect_meminfo: bool, + include_method_modifiers: bool = False, ) -> AsyncProfiledProcess: ap_proc: Optional[AsyncProfiledProcess] = self._pid_to_ap_proc.get(process.pid, None) if ap_proc is None: ap_proc = AsyncProfiledProcess( - process, profiler_state, mode, ap_safemode, ap_args, jattach_timeout, mcache, collect_meminfo + process, + profiler_state, + mode, + ap_safemode, + ap_args, + jattach_timeout, + mcache, + collect_meminfo, + include_method_modifiers, ) ap_proc.setup() self._pid_to_ap_proc[process.pid] = ap_proc From e7d358c7b66a4c250d02295d06bf31e0cd93fdcb Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Tue, 21 Mar 2023 19:28:13 +0100 Subject: [PATCH 03/11] Test fixes --- tests/test_java.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_java.py b/tests/test_java.py index 51d52d477..b2b7ccb34 100644 --- a/tests/test_java.py +++ b/tests/test_java.py @@ -472,12 +472,14 @@ def test_java_noexec_dirs( # should use this path instead of /tmp/gprofiler_tmp/... jattach_loads = filter_jattach_load_records(caplog.records) - # 2 entries - start and stop - assert len(jattach_loads) == 2 + # 3 entries - start and dump and stop + assert len(jattach_loads) == 3 # 3rd part of commandline to AP - shall begin with non-default directory assert all( log_record_extra(jl)["command"][3].startswith("/run/gprofiler_tmp/async-profiler-") for jl in jattach_loads ) + # test the invoked async-profiler actions against the sequence start, dump, stop + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump", "stop"] @pytest.mark.parametrize("in_container", [True]) @@ -521,10 +523,12 @@ def test_java_symlinks_in_paths( assert_collapsed(snapshot_pid_collapsed(profiler, application_pid)) jattach_loads = filter_jattach_load_records(caplog.records) - # 2 entries - start and stop - assert len(jattach_loads) == 2 + # 3 entries - start and dump and stop + assert len(jattach_loads) == 3 # 3rd part of commandline to AP - shall begin with the final, resolved path. assert all(log_record_extra(jl)["command"][3].startswith("/run/final_tmp/gprofiler_tmp/") for jl in jattach_loads) + # test the invoked async-profiler actions against the sequence start, dump, stop + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump", "stop"] @pytest.mark.parametrize("in_container", [True]) # only in container is enough From af2c0361b1f2bd6e203fbdf648273250d9b91e1e Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Tue, 21 Mar 2023 17:07:54 +0100 Subject: [PATCH 04/11] Check if target process is running before attempting to stop async-profiler --- gprofiler/profilers/java.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 9a6e2424b..50b499686 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -1406,7 +1406,8 @@ def _handle_new_kernel_messages(self) -> None: def _stop_profiling_for_pid(self, pid: int) -> None: ap_proc = self._pid_to_ap_proc.pop(pid, None) if ap_proc is not None: - ap_proc.stop_async_profiler(False) + if is_process_running(ap_proc.process): + ap_proc.stop_async_profiler(False) ap_proc.teardown() def snapshot(self) -> ProcessToProfileData: From 1b222de20e160f9a6457078c2402bf00e747a4d0 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Wed, 22 Mar 2023 16:18:38 +0100 Subject: [PATCH 05/11] Fix cleanup when jattach socket gets unavailable --- gprofiler/profilers/java.py | 11 ++++++++++- tests/test_java.py | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 50b499686..aa0e6d5e6 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -503,6 +503,7 @@ def __init__( self._include_method_modifiers = ",includemm" if include_method_modifiers else "" self._started_by_us = False self._setup_state: Optional[bool] = None + self._jattach_socket_disabled = False def _find_rw_exec_dir(self, available_dirs: Sequence[str]) -> str: """ @@ -697,6 +698,8 @@ def _run_async_profiler(self, cmd: List[str]) -> str: raise JattachTimeout(*args, timeout=self._jattach_timeout) from None elif e.stderr == b"Could not start attach mechanism: No such file or directory\n": # this is true for jattach_hotspot + # in this case socket won't be recreated and jattach calls are useless + self._jattach_socket_disabled = True raise JattachSocketMissingException(*args) from None else: raise JattachException(*args) from None @@ -774,6 +777,9 @@ def read_output(self) -> Optional[str]: def is_started_by_us(self) -> bool: return self._started_by_us + def is_jattach_socket_disabled(self) -> bool: + return self._jattach_socket_disabled + @register_profiler( "Java", @@ -1407,7 +1413,10 @@ def _stop_profiling_for_pid(self, pid: int) -> None: ap_proc = self._pid_to_ap_proc.pop(pid, None) if ap_proc is not None: if is_process_running(ap_proc.process): - ap_proc.stop_async_profiler(False) + if not ap_proc.is_jattach_socket_disabled(): + ap_proc.stop_async_profiler(False) + else: + logger.debug("Profiled PID lost jattach socket; async profiler won't be stopped immediately") ap_proc.teardown() def snapshot(self) -> ProcessToProfileData: diff --git a/tests/test_java.py b/tests/test_java.py index b2b7ccb34..7f7312a36 100644 --- a/tests/test_java.py +++ b/tests/test_java.py @@ -580,7 +580,8 @@ def test_java_attach_socket_missing( profiler_state: ProfilerState, ) -> None: """ - Tests that we get the proper JattachMissingSocketException when the attach socket is deleted. + Tests that we get the proper JattachSocketMissingException when the attach socket is deleted. + This indicates that socket won't be recreated and we won't be able to utilize jattach. """ with make_java_profiler( From c7415577f933b9500613b7206156f7b46c19d560 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Thu, 23 Mar 2023 16:16:09 +0000 Subject: [PATCH 06/11] Make profiling continuous with async-profiler's recycle feature --- gprofiler/profilers/java.py | 17 ++++++++++------- scripts/async_profiler_build_shared.sh | 6 +++--- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index ba104404e..68f60c6d1 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -632,6 +632,9 @@ def _get_interval_arg(self, interval: int) -> str: return f",alloc={interval}" return f",interval={interval}" + def _get_ap_recycle_args(self, ap_timeout: int) -> str: + return f",recycle,timeout={ap_timeout}" + def _get_start_cmd(self, interval: int, ap_timeout: int) -> List[str]: return self._get_base_cmd() + [ f"start,event={self._mode}" @@ -650,18 +653,20 @@ def _get_stop_cmd(self, with_output: bool) -> List[str]: f"{self._get_extra_ap_args()}" ] - def _get_dump_cmd(self, ap_timeout: int) -> List[str]: + def _get_dump_cmd(self, ap_timeout: int = 0) -> List[str]: return self._get_base_cmd() + [ f"dump" f",log={self._log_path_process}" f"{self._get_ap_output_args()}" - f",timeout={ap_timeout}" + f"{self._get_ap_recycle_args(ap_timeout)}" f"{',lib' if self._profiler_state.insert_dso_name else ''}{',meminfolog' if self._collect_meminfo else ''}" f"{self._get_extra_ap_args()}" ] - def _get_status_cmd(self) -> List[str]: - return self._get_base_cmd() + ["status" f"{self._get_ap_output_args()}"] + def _get_status_cmd(self, ap_timeout: int = 0) -> List[str]: + return self._get_base_cmd() + [ + "status" f"{self._get_ap_output_args()}" f"{self._get_ap_recycle_args(ap_timeout)}" + ] def _read_ap_log(self) -> str: if not os.path.exists(self._log_path_host): @@ -946,9 +951,7 @@ def __init__( self._jattach_jcmd_runner = JattachJcmdRunner( stop_event=self._profiler_state.stop_event, jattach_timeout=self._jattach_timeout ) - # TODO: disable timeout until support for restart is added in async-profiler - # self._ap_timeout = self._duration + self._AP_EXTRA_TIMEOUT_S - self._ap_timeout = 0 + self._ap_timeout = self._duration + self._AP_EXTRA_TIMEOUT_S application_identifiers.ApplicationIdentifiers.init_java(self._jattach_jcmd_runner) self._metadata = JavaMetadata( self._profiler_state.stop_event, self._jattach_jcmd_runner, self._collect_jvm_flags diff --git a/scripts/async_profiler_build_shared.sh b/scripts/async_profiler_build_shared.sh index 4f7af297e..875ab7fd8 100755 --- a/scripts/async_profiler_build_shared.sh +++ b/scripts/async_profiler_build_shared.sh @@ -5,10 +5,10 @@ # set -euo pipefail -VERSION=v2.9g6 -GIT_REV="4938ce815be597fafc6a7a185a16ff394b9d7f41" +VERSION=timeout-recycle +GIT_REV="666a620f2519405ef6630aa36ceb03166a42282b" -git clone --depth 1 -b "$VERSION" https://github.com/Granulate/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" +git clone --depth 1 -b "$VERSION" https://github.com/marcin-ol/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" make all # add a version file to the build directory From 4e4b04759cf1d9f4d273162994a08f98cb80b249 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Thu, 23 Mar 2023 17:25:14 +0000 Subject: [PATCH 07/11] Adjust stacks for cumulative outcomes from dump --- gprofiler/profilers/java.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 5a6e56a5b..7da2416b9 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -968,6 +968,7 @@ def __init__( self._include_method_modifiers = java_include_method_modifiers # keep ap_process instances to enable continuous profiling self._pid_to_ap_proc: Dict[int, Optional[AsyncProfiledProcess]] = {} + self._cumulative_stacks: StackToSampleCount = StackToSampleCount() def _init_ap_mode(self, profiling_mode: str, ap_mode: str) -> None: assert profiling_mode in ("cpu", "allocation"), "async-profiler support only cpu/allocation profiling modes" @@ -1278,8 +1279,16 @@ def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfile raise Exception( f"async-profiler is still running in {ap_proc.process.pid}, even after trying to stop it!" ) + self._cumulative_stacks.clear() return AsyncProfilerStartStatus.STARTED_BY_US + def _get_adjusted_output_stacks(self, input_stacks: StackToSampleCount) -> StackToSampleCount: + adjusted_stacks = StackToSampleCount(input_stacks) + if len(self._cumulative_stacks) > 0: + adjusted_stacks -= self._cumulative_stacks + self._cumulative_stacks = input_stacks + return adjusted_stacks + def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: self._prepare_async_profiler(ap_proc) @@ -1310,7 +1319,8 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration logger.info(f"Finished profiling process {ap_proc.process.pid}") # TODO: adjust output for cumulative frame count - async-profiler doesn't reset it # TODO: account for dump skew - time between last dump output and the next call to snapshot() - return parse_one_collapsed(output, comm) + stacks = self._get_adjusted_output_stacks(parse_one_collapsed(output, comm)) + return stacks def _check_hotspot_error(self, ap_proc: AsyncProfiledProcess) -> None: pid = ap_proc.process.pid From ea38081f8b5a9b2b15c75b3eb8672142ac7f33f4 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Thu, 23 Mar 2023 23:01:00 +0000 Subject: [PATCH 08/11] Adjust output of each profiled java process independently --- gprofiler/profilers/java.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 7da2416b9..8ec03a7ce 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -8,6 +8,7 @@ import re import secrets import signal +from collections import defaultdict from dataclasses import dataclass from enum import Enum, auto from pathlib import Path @@ -968,7 +969,8 @@ def __init__( self._include_method_modifiers = java_include_method_modifiers # keep ap_process instances to enable continuous profiling self._pid_to_ap_proc: Dict[int, Optional[AsyncProfiledProcess]] = {} - self._cumulative_stacks: StackToSampleCount = StackToSampleCount() + # keep stacks recorded for each pid to properly derive stacks for the profiling duration + self._pid_to_dumped_stacks: Dict[int, StackToSampleCount] = defaultdict(StackToSampleCount) def _init_ap_mode(self, profiling_mode: str, ap_mode: str) -> None: assert profiling_mode in ("cpu", "allocation"), "async-profiler support only cpu/allocation profiling modes" @@ -1279,14 +1281,16 @@ def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfile raise Exception( f"async-profiler is still running in {ap_proc.process.pid}, even after trying to stop it!" ) - self._cumulative_stacks.clear() + # discard saved dump stacks for process we're (re)starting here + self._pid_to_dumped_stacks.pop(ap_proc.process.pid, None) return AsyncProfilerStartStatus.STARTED_BY_US - def _get_adjusted_output_stacks(self, input_stacks: StackToSampleCount) -> StackToSampleCount: + def _get_adjusted_output_stacks(self, pid: int, input_stacks: StackToSampleCount) -> StackToSampleCount: adjusted_stacks = StackToSampleCount(input_stacks) - if len(self._cumulative_stacks) > 0: - adjusted_stacks -= self._cumulative_stacks - self._cumulative_stacks = input_stacks + dumped_stacks = self._pid_to_dumped_stacks[pid] + if len(dumped_stacks) > 0: + adjusted_stacks -= dumped_stacks + self._pid_to_dumped_stacks[pid] = input_stacks return adjusted_stacks def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: @@ -1319,8 +1323,9 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration logger.info(f"Finished profiling process {ap_proc.process.pid}") # TODO: adjust output for cumulative frame count - async-profiler doesn't reset it # TODO: account for dump skew - time between last dump output and the next call to snapshot() - stacks = self._get_adjusted_output_stacks(parse_one_collapsed(output, comm)) - return stacks + dumped_stacks = parse_one_collapsed(output, comm) + adjusted_stacks = self._get_adjusted_output_stacks(ap_proc.process.pid, dumped_stacks) + return adjusted_stacks def _check_hotspot_error(self, ap_proc: AsyncProfiledProcess) -> None: pid = ap_proc.process.pid @@ -1451,6 +1456,7 @@ def _handle_new_kernel_messages(self) -> None: def _stop_profiling_for_pid(self, pid: int) -> None: ap_proc = self._pid_to_ap_proc.pop(pid, None) + self._pid_to_dumped_stacks.pop(pid, None) if ap_proc is not None: if is_process_running(ap_proc.process): if not ap_proc.is_jattach_socket_disabled(): From 2feb8b7a7a94b01a404f997adeff02becc54a077 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Fri, 14 Jul 2023 14:52:28 +0200 Subject: [PATCH 09/11] Use updated async-profiler, synced with 2.10 --- scripts/async_profiler_build_shared.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/async_profiler_build_shared.sh b/scripts/async_profiler_build_shared.sh index 937580880..21c1acefa 100755 --- a/scripts/async_profiler_build_shared.sh +++ b/scripts/async_profiler_build_shared.sh @@ -5,8 +5,8 @@ # set -euo pipefail -VERSION=v2.10g1 -GIT_REV="b3baf6010e31378679a28fe1cd30168f51cc1ade" +VERSION=timeout-recycle +GIT_REV="684098cf1b94c1f5d89f1d591e6e910e1c8c60a9" git clone --depth 1 -b "$VERSION" https://github.com/marcin-ol/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" make all From b7d0f326d9cde26cc3fb2349bb25bfd2dc89ea43 Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Wed, 19 Jul 2023 16:22:13 +0200 Subject: [PATCH 10/11] Reset traces when calling dump --- gprofiler/profilers/java.py | 7 ++----- scripts/async_profiler_build_shared.sh | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index c4b0ecc2f..78f735114 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -671,7 +671,7 @@ def _get_stop_cmd(self, with_output: bool) -> List[str]: def _get_dump_cmd(self, ap_timeout: int = 0) -> List[str]: return self._get_base_cmd() + [ - f"dump" + f"dump,resettrace" f",log={self._log_path_process}" f"{self._get_ap_output_args()}" f"{self._get_ap_recycle_args(ap_timeout)}" @@ -1348,11 +1348,8 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration return self._profiling_error_stack("error", "process exited before reading the output", comm) else: logger.info(f"Finished profiling process {ap_proc.process.pid}") - # TODO: adjust output for cumulative frame count - async-profiler doesn't reset it - # TODO: account for dump skew - time between last dump output and the next call to snapshot() dumped_stacks = parse_one_collapsed(output, comm) - adjusted_stacks = self._get_adjusted_output_stacks(ap_proc.process.pid, dumped_stacks) - return adjusted_stacks + return dumped_stacks def _check_hotspot_error(self, ap_proc: AsyncProfiledProcess) -> None: pid = ap_proc.process.pid diff --git a/scripts/async_profiler_build_shared.sh b/scripts/async_profiler_build_shared.sh index 21c1acefa..f227f6b8d 100755 --- a/scripts/async_profiler_build_shared.sh +++ b/scripts/async_profiler_build_shared.sh @@ -6,7 +6,7 @@ set -euo pipefail VERSION=timeout-recycle -GIT_REV="684098cf1b94c1f5d89f1d591e6e910e1c8c60a9" +GIT_REV="46a6883d084e443a0b2a91ba9208302e65fe216a" git clone --depth 1 -b "$VERSION" https://github.com/marcin-ol/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" make all From 11e20b2ddddcbd91c0f22fafa16ffeb0d0815b7c Mon Sep 17 00:00:00 2001 From: Marcin Olszewski Date: Mon, 24 Jul 2023 14:45:06 +0200 Subject: [PATCH 11/11] Make java profiling truly continuous, invoking only dump --- gprofiler/profilers/java.py | 83 +++++++++++--------------- scripts/async_profiler_build_shared.sh | 2 +- tests/test_java.py | 64 ++++++++++++++++---- 3 files changed, 87 insertions(+), 62 deletions(-) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 78f735114..408ab1f1a 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -8,7 +8,6 @@ import re import secrets import signal -from collections import defaultdict from dataclasses import dataclass from enum import Enum, auto from pathlib import Path @@ -424,12 +423,17 @@ def get_ap_version() -> str: r"KB\n\n" ) +# Error reported by dump action, when profiler has not started; message defined in source: +# https://github.com/async-profiler/async-profiler/blob/v2.10/src/profiler.cpp#L1195 +_DUMP_ERROR_PROFILER_NOT_STARTED_RE = re.compile(r"\[ERROR\] Profiler has not started\n") + # Status reported by async profiler when it's started. Defined here: # https://github.com/async-profiler/async-profiler/blob/e3b7bfca227ae5c916f00abfacf0e61291df3a67/src/profiler.cpp#L1541 _AP_STATUS_RUNNING_RE = re.compile(r"Profiling is running for (?P\d+) seconds\n") class AsyncProfilerStartStatus(Enum): + NOT_RUNNING = auto() STARTED_BY_US = auto() ALREADY_RUNNING = auto() @@ -518,6 +522,7 @@ def __init__( self._started_by_us = False self._setup_state: Optional[bool] = None self._jattach_socket_disabled = False + self._start_status = AsyncProfilerStartStatus.NOT_RUNNING def _find_rw_exec_dir(self, available_dirs: Sequence[str]) -> str: """ @@ -671,19 +676,18 @@ def _get_stop_cmd(self, with_output: bool) -> List[str]: def _get_dump_cmd(self, ap_timeout: int = 0) -> List[str]: return self._get_base_cmd() + [ - f"dump,resettrace" - f",log={self._log_path_process}" + f"dump," + # reset trace after dump to start each profiling cycle with zeroed counters + "resettrace," + # make profiler check it's active (not stopped due to timeout) when running dump + "dumpactive," + f"log={self._log_path_process}" f"{self._get_ap_output_args()}" f"{self._get_ap_recycle_args(ap_timeout)}" f"{',lib' if self._profiler_state.insert_dso_name else ''}{',meminfolog' if self._collect_meminfo else ''}" f"{self._get_extra_ap_args()}" ] - def _get_status_cmd(self, ap_timeout: int = 0) -> List[str]: - return self._get_base_cmd() + [ - "status" f"{self._get_ap_output_args()}" f"{self._get_ap_recycle_args(ap_timeout)}" - ] - def _read_ap_log(self) -> str: if not os.path.exists(self._log_path_host): return "(log file doesn't exist)" @@ -767,35 +771,29 @@ def start_async_profiler( start_cmd = self._get_start_cmd(interval, ap_timeout) try: self._run_async_profiler(start_cmd) - if not self._started_by_us: - self._started_by_us = True - return AsyncProfilerStartStatus.STARTED_BY_US + self._start_status = AsyncProfilerStartStatus.STARTED_BY_US + return self._start_status except JattachException as e: if e.is_ap_loaded: if ( e.returncode == 200 # 200 == AP's COMMAND_ERROR and e.get_ap_log() == "[ERROR] Profiler already started\n" ): - # profiler was already running - return AsyncProfilerStartStatus.ALREADY_RUNNING + self._start_status = AsyncProfilerStartStatus.ALREADY_RUNNING + return self._start_status raise def stop_async_profiler(self, with_output: bool) -> str: - return self._run_async_profiler(self._get_stop_cmd(with_output)) + output: str = self._run_async_profiler(self._get_stop_cmd(with_output)) + self._start_status = AsyncProfilerStartStatus.NOT_RUNNING + return output + + def reset_start_status(self) -> None: + self._start_status = AsyncProfilerStartStatus.NOT_RUNNING def dump_from_async_profiler(self, ap_timeout: int = 0) -> str: return self._run_async_profiler(self._get_dump_cmd(ap_timeout)) - def get_async_profiler_status(self) -> AsyncProfilerStatus: - status_cmd = self._get_status_cmd() - self._run_async_profiler(status_cmd) - message = self.read_output() - if message is not None: - m = _AP_STATUS_RUNNING_RE.match(message) - if m is not None: - return AsyncProfilerStatus(message, True, int(m.group("uptime"))) - return AsyncProfilerStatus(message, False) - def read_output(self) -> Optional[str]: try: return Path(self._output_path_host).read_text() @@ -806,7 +804,7 @@ def read_output(self) -> Optional[str]: raise def is_started_by_us(self) -> bool: - return self._started_by_us + return self._start_status == AsyncProfilerStartStatus.STARTED_BY_US def is_jattach_socket_disabled(self) -> bool: return self._jattach_socket_disabled @@ -996,8 +994,6 @@ def __init__( self._include_method_modifiers = java_include_method_modifiers # keep ap_process instances to enable continuous profiling self._pid_to_ap_proc: Dict[int, Optional[AsyncProfiledProcess]] = {} - # keep stacks recorded for each pid to properly derive stacks for the profiling duration - self._pid_to_dumped_stacks: Dict[int, StackToSampleCount] = defaultdict(StackToSampleCount) def _init_ap_mode(self, profiling_mode: str, ap_mode: str) -> None: assert profiling_mode in ("cpu", "allocation"), "async-profiler support only cpu/allocation profiling modes" @@ -1291,8 +1287,8 @@ def _log_mem_usage(ap_log: str, pid: int) -> None: def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfilerStartStatus: # first time we need to start profiler ourselves - # afterwards we will check if it's still running by looking at its status - if ap_proc.is_started_by_us() and ap_proc.get_async_profiler_status().running: + # after that we rely on async-prpofiler dump and recycle functions to signal failed dump + if ap_proc.is_started_by_us(): return AsyncProfilerStartStatus.STARTED_BY_US started = ap_proc.start_async_profiler(self._interval, ap_timeout=self._ap_timeout) if started == AsyncProfilerStartStatus.ALREADY_RUNNING: @@ -1308,18 +1304,8 @@ def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfile raise Exception( f"async-profiler is still running in {ap_proc.process.pid}, even after trying to stop it!" ) - # discard saved dump stacks for process we're (re)starting here - self._pid_to_dumped_stacks.pop(ap_proc.process.pid, None) return AsyncProfilerStartStatus.STARTED_BY_US - def _get_adjusted_output_stacks(self, pid: int, input_stacks: StackToSampleCount) -> StackToSampleCount: - adjusted_stacks = StackToSampleCount(input_stacks) - dumped_stacks = self._pid_to_dumped_stacks[pid] - if len(dumped_stacks) > 0: - adjusted_stacks -= dumped_stacks - self._pid_to_dumped_stacks[pid] = input_stacks - return adjusted_stacks - def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: self._prepare_async_profiler(ap_proc) @@ -1338,9 +1324,16 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration finally: # don't stop profiler now; will do it in stop() if is_process_running(ap_proc.process): - ap_log = ap_proc.dump_from_async_profiler(self._ap_timeout) - if self._report_meminfo: - self._log_mem_usage(ap_log, ap_proc.process.pid) + try: + ap_log = ap_proc.dump_from_async_profiler(self._ap_timeout) + if self._report_meminfo: + self._log_mem_usage(ap_log, ap_proc.process.pid) + except JattachException as e: + if _DUMP_ERROR_PROFILER_NOT_STARTED_RE.search(e.get_ap_log()) is not None: + logger.warning(f"Profiler for process {ap_proc.process.pid} wasn't ready for collecting stacks") + ap_proc.reset_start_status() + return self._profiling_error_stack("error", "profiler wasn't ready for collecting stacks", comm) + raise output = ap_proc.read_output() if output is None: @@ -1482,13 +1475,7 @@ def _handle_new_kernel_messages(self) -> None: def _stop_profiling_for_pid(self, pid: int) -> None: ap_proc = self._pid_to_ap_proc.pop(pid, None) - self._pid_to_dumped_stacks.pop(pid, None) if ap_proc is not None: - if is_process_running(ap_proc.process): - if not ap_proc.is_jattach_socket_disabled(): - ap_proc.stop_async_profiler(False) - else: - logger.debug("Profiled PID lost jattach socket; async profiler won't be stopped immediately") ap_proc.teardown() def snapshot(self) -> ProcessToProfileData: diff --git a/scripts/async_profiler_build_shared.sh b/scripts/async_profiler_build_shared.sh index f227f6b8d..fad7f2f03 100755 --- a/scripts/async_profiler_build_shared.sh +++ b/scripts/async_profiler_build_shared.sh @@ -6,7 +6,7 @@ set -euo pipefail VERSION=timeout-recycle -GIT_REV="46a6883d084e443a0b2a91ba9208302e65fe216a" +GIT_REV="7d2c6abcc0b371fa0976f60cf88fbce0f4e60dc8" git clone --depth 1 -b "$VERSION" https://github.com/marcin-ol/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" make all diff --git a/tests/test_java.py b/tests/test_java.py index 30964e215..1ef58a5b4 100644 --- a/tests/test_java.py +++ b/tests/test_java.py @@ -485,14 +485,14 @@ def test_java_noexec_dirs( # should use this path instead of /tmp/gprofiler_tmp/... jattach_loads = filter_jattach_load_records(caplog.records) - # 3 entries - start and dump and stop - assert len(jattach_loads) == 3 + # 2 entries - start and dump + assert len(jattach_loads) == 2 # 3rd part of commandline to AP - shall begin with non-default directory assert all( log_record_extra(jl)["command"][3].startswith("/run/gprofiler_tmp/async-profiler-") for jl in jattach_loads ) - # test the invoked async-profiler actions against the sequence start, dump, stop - assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump", "stop"] + # test the invoked async-profiler actions against the sequence start, dump + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump"] @pytest.mark.parametrize("in_container", [True]) @@ -536,12 +536,12 @@ def test_java_symlinks_in_paths( assert_collapsed(snapshot_pid_collapsed(profiler, application_pid)) jattach_loads = filter_jattach_load_records(caplog.records) - # 3 entries - start and dump and stop - assert len(jattach_loads) == 3 + # 2 entries - start and dump + assert len(jattach_loads) == 2 # 3rd part of commandline to AP - shall begin with the final, resolved path. assert all(log_record_extra(jl)["command"][3].startswith("/run/final_tmp/gprofiler_tmp/") for jl in jattach_loads) # test the invoked async-profiler actions against the sequence start, dump, stop - assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump", "stop"] + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump"] @pytest.mark.parametrize("in_container", [True]) # only in container is enough @@ -610,6 +610,44 @@ def test_java_attach_socket_missing( assert next(iter(profile.stacks.keys())) == "java;[Profiling error: exception JattachSocketMissingException]" +@pytest.mark.parametrize("in_container", [True]) # only in container is enough +def test_async_profiler_dump_profiling_recovery( + application_pid: int, + profiler_state: ProfilerState, + assert_collapsed: AssertInCollapsed, + caplog: LogCaptureFixture, +) -> None: + """ + Tests that async-profiler can recover from a timeout. + Continuous profiling with dump can fail, if dump call gets delayed. + Another profiling cycle should recover from timeout and start profiler again. + """ + caplog.set_level(logging.DEBUG) + with make_java_profiler( + profiler_state, + duration=1, + # java_jattach_timeout=31, + ) as profiler: + profiler._ap_timeout = 2 + assert_collapsed(snapshot_pid_collapsed(profiler, application_pid)) + + # delay next snapshot to trigger timeout + time.sleep(3) + round2_stacks = snapshot_pid_collapsed(profiler, application_pid) + assert round2_stacks == Counter({"java;[Profiling error: profiler wasn't ready for collecting stacks]": 1}) + + # start another round to test recovery of async-profiler + caplog.clear() + round3_stacks = snapshot_pid_collapsed(profiler, application_pid) + assert_collapsed(round3_stacks) + # verify async-profiler was started before another dump + jattach_loads = filter_jattach_load_records(caplog.records) + # 2 entries - start and dump and stop + assert len(jattach_loads) == 2 + # test the invoked async-profiler actions against the sequence start, dump + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump"] + + # we know what messages to expect when in container, not on the host Java @pytest.mark.parametrize("in_container", [True]) def test_java_jattach_async_profiler_log_output( @@ -798,21 +836,21 @@ def test_no_stray_output_in_stdout_stderr( assert_collapsed: AssertInCollapsed, profiler_state: ProfilerState, ) -> None: - # save original stop function - stop_async_profiler = AsyncProfiledProcess.stop_async_profiler + # save original dump function + dump_from_async_profiler = AsyncProfiledProcess.dump_from_async_profiler - # replace async profiler stop routine to trigger flushing standard output - def flush_output_and_stop_async_profiler(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> str: + # replace async profiler dump routine to trigger flushing standard output + def flush_output_and_dump_from_async_profiler(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> str: # Call 'version' action on async-profiler to make sure writes to stdout are flushed. Handling of 'version' # action involves calling flush on output stream: # (https://github.com/Granulate/async-profiler/blob/58c62fe4e816b60907ca84e315936834fc1cbae4/src/profiler.cpp#L1548) self._run_async_profiler( self._get_base_cmd() + [f"version" f",log={self._log_path_process}"], ) - result = stop_async_profiler(self, *args, **kwargs) + result = dump_from_async_profiler(self, *args, **kwargs) return result - monkeypatch.setattr(AsyncProfiledProcess, "stop_async_profiler", flush_output_and_stop_async_profiler) + monkeypatch.setattr(AsyncProfiledProcess, "dump_from_async_profiler", flush_output_and_dump_from_async_profiler) with make_java_profiler( profiler_state,