diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 9bcc4cb67..fe4c8afba 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -8,7 +8,8 @@ import re import secrets import signal -from enum import Enum +from dataclasses import dataclass +from enum import Enum, auto from pathlib import Path from subprocess import CompletedProcess from threading import Event, Lock @@ -444,6 +445,27 @@ def get_ap_version() -> str: r"KB\n\n" ) +# Error reported by dump action, when profiler has not started; message defined in source: +# https://github.com/async-profiler/async-profiler/blob/v2.10/src/profiler.cpp#L1195 +_DUMP_ERROR_PROFILER_NOT_STARTED_RE = re.compile(r"\[ERROR\] Profiler has not started\n") + +# Status reported by async profiler when it's started. Defined here: +# https://github.com/async-profiler/async-profiler/blob/e3b7bfca227ae5c916f00abfacf0e61291df3a67/src/profiler.cpp#L1541 +_AP_STATUS_RUNNING_RE = re.compile(r"Profiling is running for (?P\d+) seconds\n") + + +class AsyncProfilerStartStatus(Enum): + NOT_RUNNING = auto() + STARTED_BY_US = auto() + ALREADY_RUNNING = auto() + + +@dataclass(frozen=True) +class AsyncProfilerStatus: + message: Optional[str] + running: bool + uptime: int = 0 + class AsyncProfiledProcess: """ @@ -522,6 +544,10 @@ def __init__( self._collect_meminfo = collect_meminfo self._include_method_modifiers = ",includemm" if include_method_modifiers else "" self._include_line_numbers = ",includeln" if java_line_numbers == "line-of-function" else "" + self._started_by_us = False + self._setup_state: Optional[bool] = None + self._jattach_socket_disabled = False + self._start_status = AsyncProfilerStartStatus.NOT_RUNNING def _find_rw_exec_dir(self) -> str: """ @@ -550,6 +576,11 @@ def _find_rw_exec_dir(self) -> str: ) def __enter__(self: T) -> T: + self.setup() + return self + + def setup(self) -> None: + assert self._setup_state is None, "AsyncProfilerProcess must not be setup twice" # create the directory structure for executable libap, make sure it's owned by root # for sanity & simplicity, mkdir_owned_root() does not support creating parent directories, as this allows # the caller to absentmindedly ignore the check of the parents ownership. @@ -569,8 +600,7 @@ def __enter__(self: T) -> T: self._recreate_log() # copy libasyncProfiler.so if needed self._copy_libap() - - return self + self._setup_state = True def __exit__( self, @@ -578,11 +608,16 @@ def __exit__( exc_val: Optional[BaseException], exc_ctb: Optional[TracebackType], ) -> None: + self.teardown() + + def teardown(self) -> None: + assert self._setup_state is True, "AsyncProfilerProcess must not run teardown twice" # ignore_errors because we are deleting paths via /proc/pid/root - and the pid # we're using might have gone down already. # remove them as best effort. remove_path(self._output_path_host, missing_ok=True) remove_path(self._log_path_host, missing_ok=True) + self._setup_state = False def _existing_realpath(self, path: str) -> Optional[str]: """ @@ -664,6 +699,9 @@ def _get_interval_arg(self, interval: int) -> str: return f",alloc={interval}" return f",interval={interval}" + def _get_ap_recycle_args(self, ap_timeout: int) -> str: + return f",recycle,timeout={ap_timeout}" + def _get_start_cmd(self, interval: int, ap_timeout: int) -> List[str]: return self._get_base_cmd() + [ f"start,event={self._mode}" @@ -682,6 +720,20 @@ def _get_stop_cmd(self, with_output: bool) -> List[str]: f"{self._get_extra_ap_args()}" ] + def _get_dump_cmd(self, ap_timeout: int = 0) -> List[str]: + return self._get_base_cmd() + [ + f"dump," + # reset trace after dump to start each profiling cycle with zeroed counters + "resettrace," + # make profiler check it's active (not stopped due to timeout) when running dump + "dumpactive," + f"log={self._log_path_process}" + f"{self._get_ap_output_args()}" + f"{self._get_ap_recycle_args(ap_timeout)}" + f"{',lib' if self._profiler_state.insert_dso_name else ''}{',meminfolog' if self._collect_meminfo else ''}" + f"{self._get_extra_ap_args()}" + ] + def _read_ap_log(self) -> str: if not os.path.exists(self._log_path_host): return "(log file doesn't exist)" @@ -719,6 +771,8 @@ def _run_async_profiler(self, cmd: List[str]) -> str: raise JattachTimeout(*args, timeout=self._jattach_timeout) from None elif e.stderr == "Could not start attach mechanism: No such file or directory\n": # this is true for jattach_hotspot + # in this case socket won't be recreated and jattach calls are useless + self._jattach_socket_disabled = True raise JattachSocketMissingException(*args) from None else: raise JattachException(*args) from None @@ -750,9 +804,11 @@ def _run_fdtransfer(self) -> None: timeout=self._FDTRANSFER_TIMEOUT, ) - def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeout: int = 0) -> bool: + def start_async_profiler( + self, interval: int, second_try: bool = False, ap_timeout: int = 0 + ) -> AsyncProfilerStartStatus: """ - Returns True if profiling was started; False if it was already started. + Returns STARTED_BY_US if profiling was started; ALREADY_RUNNING if it was already started. ap_timeout defaults to 0, which means "no timeout" for AP (see call to startTimer() in profiler.cpp) """ if self._mode == "cpu" and not second_try: @@ -761,19 +817,28 @@ def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeo start_cmd = self._get_start_cmd(interval, ap_timeout) try: self._run_async_profiler(start_cmd) - return True + self._start_status = AsyncProfilerStartStatus.STARTED_BY_US + return self._start_status except JattachException as e: if e.is_ap_loaded: if ( e.returncode == 200 # 200 == AP's COMMAND_ERROR and e.get_ap_log() == "[ERROR] Profiler already started\n" ): - # profiler was already running - return False + self._start_status = AsyncProfilerStartStatus.ALREADY_RUNNING + return self._start_status raise def stop_async_profiler(self, with_output: bool) -> str: - return self._run_async_profiler(self._get_stop_cmd(with_output)) + output: str = self._run_async_profiler(self._get_stop_cmd(with_output)) + self._start_status = AsyncProfilerStartStatus.NOT_RUNNING + return output + + def reset_start_status(self) -> None: + self._start_status = AsyncProfilerStartStatus.NOT_RUNNING + + def dump_from_async_profiler(self, ap_timeout: int = 0) -> str: + return self._run_async_profiler(self._get_dump_cmd(ap_timeout)) def read_output(self) -> Optional[str]: try: @@ -784,6 +849,12 @@ def read_output(self) -> Optional[str]: return None raise + def is_started_by_us(self) -> bool: + return self._start_status == AsyncProfilerStartStatus.STARTED_BY_US + + def is_jattach_socket_disabled(self) -> bool: + return self._jattach_socket_disabled + @register_profiler( "Java", @@ -976,6 +1047,8 @@ def __init__( self._java_full_hserr = java_full_hserr self._include_method_modifiers = java_include_method_modifiers self._java_line_numbers = java_line_numbers + # keep ap_process instances to enable continuous profiling + self._pid_to_ap_proc: Dict[int, Optional[AsyncProfiledProcess]] = {} def _init_ap_mode(self, profiling_mode: str, ap_mode: str) -> None: assert profiling_mode in ("cpu", "allocation"), "async-profiler support only cpu/allocation profiling modes" @@ -1160,6 +1233,37 @@ def _check_async_profiler_loaded(self, process: Process) -> bool: return False + def _get_ap_proc( + self, + process: Process, + profiler_state: ProfilerState, + mode: str, + ap_safemode: int, + ap_args: str, + jattach_timeout: int, + mcache: int, + collect_meminfo: bool, + include_method_modifiers: bool = False, + java_line_numbers: str = "none", + ) -> AsyncProfiledProcess: + ap_proc: Optional[AsyncProfiledProcess] = self._pid_to_ap_proc.get(process.pid, None) + if ap_proc is None: + ap_proc = AsyncProfiledProcess( + process, + profiler_state, + mode, + ap_safemode, + ap_args, + jattach_timeout, + mcache, + collect_meminfo, + include_method_modifiers, + java_line_numbers, + ) + ap_proc.setup() + self._pid_to_ap_proc[process.pid] = ap_proc + return ap_proc + def _profile_process(self, process: Process, duration: int, spawned: bool) -> ProfileData: comm = process_comm(process) exe = process_exe(process) @@ -1201,7 +1305,7 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr logger.debug("Process paths", pid=process.pid, execfn=execfn, exe=exe) logger.debug("Process mapped files", pid=process.pid, maps=set(m.path for m in process.memory_maps())) - with AsyncProfiledProcess( + ap_proc = self._get_ap_proc( process, self._profiler_state, self._mode, @@ -1212,8 +1316,8 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr self._report_meminfo, self._include_method_modifiers, self._java_line_numbers, - ) as ap_proc: - stackcollapse = self._profile_ap_process(ap_proc, comm, duration) + ) + stackcollapse = self._profile_ap_process(ap_proc, comm, duration) return ProfileData(stackcollapse, appid, app_metadata, container_name) @@ -1234,9 +1338,13 @@ def _log_mem_usage(ap_log: str, pid: int) -> None: pid=pid, ) - def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: + def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfilerStartStatus: + # first time we need to start profiler ourselves + # after that we rely on async-prpofiler dump and recycle functions to signal failed dump + if ap_proc.is_started_by_us(): + return AsyncProfilerStartStatus.STARTED_BY_US started = ap_proc.start_async_profiler(self._interval, ap_timeout=self._ap_timeout) - if not started: + if started == AsyncProfilerStartStatus.ALREADY_RUNNING: logger.info(f"Found async-profiler already started on {ap_proc.process.pid}, trying to stop it...") # stop, and try to start again. this might happen if AP & gProfiler go out of sync: for example, # gProfiler being stopped brutally, while AP keeps running. If gProfiler is later started again, it will @@ -1245,10 +1353,14 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration # surely does. ap_proc.stop_async_profiler(with_output=False) started = ap_proc.start_async_profiler(self._interval, second_try=True, ap_timeout=self._ap_timeout) - if not started: + if started == AsyncProfilerStartStatus.ALREADY_RUNNING: raise Exception( f"async-profiler is still running in {ap_proc.process.pid}, even after trying to stop it!" ) + return AsyncProfilerStartStatus.STARTED_BY_US + + def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount: + self._prepare_async_profiler(ap_proc) try: wait_event( @@ -1263,10 +1375,18 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration logger.debug(f"Profiled process {ap_proc.process.pid} exited before stopping async-profiler") # fall-through - try to read the output, since async-profiler writes it upon JVM exit. finally: + # don't stop profiler now; will do it in stop() if is_process_running(ap_proc.process): - ap_log = ap_proc.stop_async_profiler(True) - if self._report_meminfo: - self._log_mem_usage(ap_log, ap_proc.process.pid) + try: + ap_log = ap_proc.dump_from_async_profiler(self._ap_timeout) + if self._report_meminfo: + self._log_mem_usage(ap_log, ap_proc.process.pid) + except JattachException as e: + if _DUMP_ERROR_PROFILER_NOT_STARTED_RE.search(e.get_ap_log()) is not None: + logger.warning(f"Profiler for process {ap_proc.process.pid} wasn't ready for collecting stacks") + ap_proc.reset_start_status() + return self._profiling_error_stack("error", "profiler wasn't ready for collecting stacks", comm) + raise output = ap_proc.read_output() if output is None: @@ -1274,7 +1394,8 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration return self._profiling_error_stack("error", "process exited before reading the output", comm) else: logger.info(f"Finished profiling process {ap_proc.process.pid}") - return parse_one_collapsed(output, comm) + dumped_stacks = parse_one_collapsed(output, comm) + return dumped_stacks def _check_hotspot_error(self, ap_proc: AsyncProfiledProcess) -> None: pid = ap_proc.process.pid @@ -1334,6 +1455,9 @@ def stop(self) -> None: if self._enabled_proc_events_java: proc_events.unregister_exit_callback(self._proc_exit_callback) self._enabled_proc_events_java = False + pids_to_remove = list(self._pid_to_ap_proc.keys()) + for pid in pids_to_remove: + self._stop_profiling_for_pid(pid) super().stop() def _proc_exit_callback(self, tid: int, pid: int, exit_code: int) -> None: @@ -1402,6 +1526,11 @@ def _handle_new_kernel_messages(self) -> None: else: self._handle_kernel_messages(messages) + def _stop_profiling_for_pid(self, pid: int) -> None: + ap_proc = self._pid_to_ap_proc.pop(pid, None) + if ap_proc is not None: + ap_proc.teardown() + def snapshot(self) -> ProcessToProfileData: try: return super().snapshot() @@ -1411,4 +1540,5 @@ def snapshot(self) -> ProcessToProfileData: self._want_to_profile_pids -= self._pids_to_remove for pid in self._pids_to_remove: self._pid_to_java_version.pop(pid, None) + self._stop_profiling_for_pid(pid) self._pids_to_remove.clear() diff --git a/scripts/async_profiler_build_shared.sh b/scripts/async_profiler_build_shared.sh index b61266cec..9d2dbe58f 100755 --- a/scripts/async_profiler_build_shared.sh +++ b/scripts/async_profiler_build_shared.sh @@ -5,10 +5,10 @@ # set -euo pipefail -VERSION=v2.10g2 -GIT_REV="40b850a4101756bc398051661d1adbbe5d7e2211" +VERSION=timeout-recycle +GIT_REV="0107623aee6b96e081bf9b12bb964d6533fff1db" -git clone --depth 1 -b "$VERSION" https://github.com/Granulate/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" +git clone --depth 1 -b "$VERSION" https://github.com/marcin-ol/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV" make all # add a version file to the build directory diff --git a/tests/test_java.py b/tests/test_java.py index 28159ab57..65e3cd79d 100644 --- a/tests/test_java.py +++ b/tests/test_java.py @@ -32,6 +32,7 @@ from gprofiler.profilers.java import ( JAVA_SAFEMODE_ALL, AsyncProfiledProcess, + AsyncProfilerStartStatus, JavaFlagCollectionOptions, JavaProfiler, _get_process_ns_java_path, @@ -123,7 +124,7 @@ def test_async_profiler_already_running( ap_safemode=0, ap_args="", ) as ap_proc: - assert ap_proc.start_async_profiler(frequency_to_ap_interval(11)) + assert ap_proc.start_async_profiler(frequency_to_ap_interval(11)) == AsyncProfilerStartStatus.STARTED_BY_US assert any("libasyncProfiler.so" in m.path for m in process.memory_maps()) # run "status" with AsyncProfiledProcessForTests( @@ -255,7 +256,9 @@ def test_hotspot_error_file( start_async_profiler = AsyncProfiledProcess.start_async_profiler # Simulate crashing process - def start_async_profiler_and_crash(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> bool: + def start_async_profiler_and_crash( + self: AsyncProfiledProcess, *args: Any, **kwargs: Any + ) -> AsyncProfilerStartStatus: result = start_async_profiler(self, *args, **kwargs) self.process.send_signal(signal.SIGBUS) return result @@ -367,7 +370,10 @@ def test_async_profiler_stops_after_given_timeout( ap_safemode=0, ap_args="", ) as ap_proc: - assert ap_proc.start_async_profiler(frequency_to_ap_interval(11), ap_timeout=timeout_s) + assert ( + ap_proc.start_async_profiler(frequency_to_ap_interval(11), ap_timeout=timeout_s) + == AsyncProfilerStartStatus.STARTED_BY_US + ) ap_proc.status_async_profiler() assert "Profiling is running for " in cast_away_optional(ap_proc.read_output()) @@ -444,7 +450,7 @@ def _filter_record(r: LogRecord) -> bool: r.message == "Running command" and len(log_record_extra(r)["command"]) >= 6 and log_record_extra(r)["command"][1] == "jattach" - and any(map(lambda k: k in log_record_extra(r)["command"][5], ["start,", "stop,"])) + and any(map(lambda k: k in log_record_extra(r)["command"][5], ["start,", "stop,", "dump,"])) ) return list(filter(_filter_record, records)) @@ -488,12 +494,14 @@ def test_java_noexec_dirs( # should use this path instead of /tmp/gprofiler_tmp/... jattach_loads = filter_jattach_load_records(caplog.records) - # 2 entries - start and stop + # 2 entries - start and dump assert len(jattach_loads) == 2 # 3rd part of commandline to AP - shall begin with non-default directory assert all( log_record_extra(jl)["command"][3].startswith("/run/gprofiler_tmp/async-profiler-") for jl in jattach_loads ) + # test the invoked async-profiler actions against the sequence start, dump + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump"] @pytest.mark.parametrize("in_container", [True]) @@ -537,10 +545,12 @@ def test_java_symlinks_in_paths( assert_collapsed(snapshot_pid_collapsed(profiler, application_pid)) jattach_loads = filter_jattach_load_records(caplog.records) - # 2 entries - start and stop + # 2 entries - start and dump assert len(jattach_loads) == 2 # 3rd part of commandline to AP - shall begin with the final, resolved path. assert all(log_record_extra(jl)["command"][3].startswith("/run/final_tmp/gprofiler_tmp/") for jl in jattach_loads) + # test the invoked async-profiler actions against the sequence start, dump, stop + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump"] @pytest.mark.parametrize("in_container", [True]) # only in container is enough @@ -560,7 +570,9 @@ def test_java_appid_and_metadata_before_process_exits( start_async_profiler = AsyncProfiledProcess.start_async_profiler # Make the process exit before profiling ends - def start_async_profiler_and_interrupt(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> bool: + def start_async_profiler_and_interrupt( + self: AsyncProfiledProcess, *args: Any, **kwargs: Any + ) -> AsyncProfilerStartStatus: result = start_async_profiler(self, *args, **kwargs) time.sleep(3) self.process.send_signal(signal.SIGINT) @@ -590,7 +602,8 @@ def test_java_attach_socket_missing( profiler_state: ProfilerState, ) -> None: """ - Tests that we get the proper JattachMissingSocketException when the attach socket is deleted. + Tests that we get the proper JattachSocketMissingException when the attach socket is deleted. + This indicates that socket won't be recreated and we won't be able to utilize jattach. """ with make_java_profiler( profiler_state, @@ -606,6 +619,44 @@ def test_java_attach_socket_missing( assert next(iter(profile.stacks.keys())) == "java;[Profiling error: exception JattachSocketMissingException]" +@pytest.mark.parametrize("in_container", [True]) # only in container is enough +def test_async_profiler_dump_profiling_recovery( + application_pid: int, + profiler_state: ProfilerState, + assert_collapsed: AssertInCollapsed, + caplog: LogCaptureFixture, +) -> None: + """ + Tests that async-profiler can recover from a timeout. + Continuous profiling with dump can fail, if dump call gets delayed. + Another profiling cycle should recover from timeout and start profiler again. + """ + caplog.set_level(logging.DEBUG) + with make_java_profiler( + profiler_state, + duration=1, + # java_jattach_timeout=31, + ) as profiler: + profiler._ap_timeout = 2 + assert_collapsed(snapshot_pid_collapsed(profiler, application_pid)) + + # delay next snapshot to trigger timeout + time.sleep(3) + round2_stacks = snapshot_pid_collapsed(profiler, application_pid) + assert round2_stacks == Counter({"java;[Profiling error: profiler wasn't ready for collecting stacks]": 1}) + + # start another round to test recovery of async-profiler + caplog.clear() + round3_stacks = snapshot_pid_collapsed(profiler, application_pid) + assert_collapsed(round3_stacks) + # verify async-profiler was started before another dump + jattach_loads = filter_jattach_load_records(caplog.records) + # 2 entries - start and dump and stop + assert len(jattach_loads) == 2 + # test the invoked async-profiler actions against the sequence start, dump + assert [log_record_extra(jl)["command"][5].split(",", 1)[0] for jl in jattach_loads] == ["start", "dump"] + + # we know what messages to expect when in container, not on the host Java @pytest.mark.parametrize("in_container", [True]) def test_java_jattach_async_profiler_log_output( @@ -831,21 +882,21 @@ def test_no_stray_output_in_stdout_stderr( assert_collapsed: AssertInCollapsed, profiler_state: ProfilerState, ) -> None: - # save original stop function - stop_async_profiler = AsyncProfiledProcess.stop_async_profiler + # save original dump function + dump_from_async_profiler = AsyncProfiledProcess.dump_from_async_profiler - # replace async profiler stop routine to trigger flushing standard output - def flush_output_and_stop_async_profiler(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> str: + # replace async profiler dump routine to trigger flushing standard output + def flush_output_and_dump_from_async_profiler(self: AsyncProfiledProcess, *args: Any, **kwargs: Any) -> str: # Call 'version' action on async-profiler to make sure writes to stdout are flushed. Handling of 'version' # action involves calling flush on output stream: # (https://github.com/Granulate/async-profiler/blob/58c62fe4e816b60907ca84e315936834fc1cbae4/src/profiler.cpp#L1548) self._run_async_profiler( self._get_base_cmd() + [f"version" f",log={self._log_path_process}"], ) - result = stop_async_profiler(self, *args, **kwargs) + result = dump_from_async_profiler(self, *args, **kwargs) return result - monkeypatch.setattr(AsyncProfiledProcess, "stop_async_profiler", flush_output_and_stop_async_profiler) + monkeypatch.setattr(AsyncProfiledProcess, "dump_from_async_profiler", flush_output_and_dump_from_async_profiler) with make_java_profiler( profiler_state,