diff --git a/.github/labeler.yml b/.github/labeler.yml new file mode 100644 index 0000000000000..9359bd0ca70a6 --- /dev/null +++ b/.github/labeler.yml @@ -0,0 +1,3 @@ +# See https://github.com/actions/labeler +port-to-master: '**' +port-to-v1.10: '**' diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000000000..7ed825781c53b --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,14 @@ + +## PR Description + +_What does this PR do?_ + +## Checklist + +Requirements for merging: +- [ ] I have opened an issue or PR upstream on JuliaLang/julia: +- [ ] I have removed the `port-to-*` labels that don't apply. +- [ ] I have opened a PR on raicode to test these changes: diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml new file mode 100644 index 0000000000000..2141a906e96cd --- /dev/null +++ b/.github/workflows/labeler.yml @@ -0,0 +1,17 @@ +# See https://github.com/actions/labeler +name: "Pull Request Labeler" +on: + pull_request_target: + types: + - opened + +jobs: + triage: + permissions: + contents: read + pull-requests: write + runs-on: ubuntu-latest + steps: + - uses: actions/labeler@v4 + with: + dot: true diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml new file mode 100644 index 0000000000000..3df2093491753 --- /dev/null +++ b/.github/workflows/stale.yml @@ -0,0 +1,16 @@ +name: "Close stale PRs" +on: + schedule: + - cron: "0 0 * * *" # every night at midnight + +jobs: + stale: + runs-on: ubuntu-latest + steps: + - uses: actions/stale@v8 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + stale-pr-message: 'This PR is stale because it has been open 30 days with no activity. Comment or remove stale label, or this PR will be closed in 5 days.' + days-before-stale: 30 + days-before-close: 5 + stale-pr-label: 'stale' diff --git a/.github/workflows/update-upstream-branches.yml b/.github/workflows/update-upstream-branches.yml new file mode 100644 index 0000000000000..247000bbd42cd --- /dev/null +++ b/.github/workflows/update-upstream-branches.yml @@ -0,0 +1,28 @@ +name: "Update upstream branches" +on: + schedule: + - cron: "0 0 * * *" # every night at midnight + workflow_dispatch: + +jobs: + PullUpstream: + runs-on: ubuntu-latest + strategy: + fail-fast: false # run all jobs in the matrix even if one fails + matrix: + branch: + - "master" + - "backports-release-1.10" + steps: + - name: Checkout RAI/julia + uses: actions/checkout@v3 + with: + ref: ${{ matrix.branch }} + - name: Update ${{ matrix.branch }} + run: | + git config --global user.email "julia-engineering@relational.ai" + git config --global user.name "RAI CI (GitHub Action Automation)" + + git remote add upstream https://github.com/JuliaLang/julia + git pull upstream ${{ matrix.branch }} + git push origin ${{ matrix.branch }} diff --git a/VERSION b/VERSION index 6b89d58f861a7..bd8abae2b13ee 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.12.2 +1.12.2+RAI diff --git a/base/loading.jl b/base/loading.jl index 316f652c78fa3..a133b04370ed8 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -4208,6 +4208,20 @@ function expand_compiler_path(tup) end compiler_chi(tup::Tuple) = CacheHeaderIncludes(expand_compiler_path(tup)) +""" + isprecompilable(f, argtypes::Tuple{Vararg{Any}}) + +Check, as far as is possible without actually compiling, if the given +function `f` can be compiled for the argument tuple (of types) `argtypes`. +""" +function isprecompilable(@nospecialize(f), @nospecialize(argtypes::Tuple)) + isprecompilable(Tuple{Core.Typeof(f), argtypes...}) +end + +function isprecompilable(@nospecialize(argt::Type)) + ccall(:jl_is_compilable, Int32, (Any,), argt) != 0 +end + """ precompile(f, argtypes::Tuple{Vararg{Any}}) diff --git a/base/options.jl b/base/options.jl index 1692c765443f3..b5730994492aa 100644 --- a/base/options.jl +++ b/base/options.jl @@ -64,6 +64,7 @@ struct JLOptions trim::Int8 task_metrics::Int8 timeout_for_safepoint_straggler_s::Int16 + safe_crash_log_file::Ptr{UInt8} end # This runs early in the sysimage != is not defined yet diff --git a/base/partr.jl b/base/partr.jl index d488330f0c87e..671c88fd6e786 100644 --- a/base/partr.jl +++ b/base/partr.jl @@ -97,7 +97,7 @@ function multiq_sift_down(heap::taskheap, idx::Int32) child = Int(child) child > length(heap.tasks) && break if isassigned(heap.tasks, child) && - heap.tasks[child].priority < heap.tasks[idx].priority + heap.tasks[child].priority <= heap.tasks[idx].priority t = heap.tasks[idx] heap.tasks[idx] = heap.tasks[child] heap.tasks[child] = t diff --git a/deps/openblas.mk b/deps/openblas.mk index e5a988ba84df2..80881a6a13c4e 100644 --- a/deps/openblas.mk +++ b/deps/openblas.mk @@ -44,7 +44,7 @@ OPENBLAS_CFLAGS := -O2 # Decide whether to build for 32-bit or 64-bit arch ifneq ($(XC_HOST),) -OPENBLAS_BUILD_OPTS += OSNAME=$(OS) CROSS=1 HOSTCC=$(HOSTCC) CROSS_SUFFIX=$(CROSS_COMPILE) +OPENBLAS_BUILD_OPTS += OSNAME=$(OS) CROSS=1 HOSTCC="$(HOSTCC)" CROSS_SUFFIX=$(CROSS_COMPILE) endif ifeq ($(OS),WINNT) ifneq ($(ARCH),x86_64) diff --git a/src/gc-pages.c b/src/gc-pages.c index 71d59de29166f..79dd8993a8861 100644 --- a/src/gc-pages.c +++ b/src/gc-pages.c @@ -28,6 +28,28 @@ JL_DLLEXPORT uint64_t jl_get_pg_size(void) static int block_pg_cnt = DEFAULT_BLOCK_PG_ALLOC; +// Julia allocates large blocks (64M) with mmap. These are never +// unmapped but the underlying physical memory may be released +// with calls to madvise(MADV_DONTNEED). +static uint64_t poolmem_blocks_allocated_total = 0; + +JL_DLLEXPORT uint64_t jl_poolmem_blocks_allocated_total(void) +{ + return poolmem_blocks_allocated_total; +} + +JL_DLLEXPORT uint64_t jl_poolmem_bytes_allocated(void) +{ + return jl_atomic_load_relaxed(&gc_heap_stats.bytes_resident); +} + +JL_DLLEXPORT uint64_t jl_current_pg_count(void) +{ + assert(jl_page_size == GC_PAGE_SZ && "RAI fork of Julia should be running on platforms for which jl_page_size == GC_PAGE_SZ"); + size_t nb = jl_atomic_load_relaxed(&gc_heap_stats.bytes_resident); + return nb / GC_PAGE_SZ; // exact division +} + void jl_gc_init_page(void) { if (GC_PAGE_SZ * block_pg_cnt < jl_page_size) @@ -55,6 +77,11 @@ char *jl_gc_try_alloc_pages_(int pg_cnt) JL_NOTSAFEPOINT MAP_NORESERVE | MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (mem == MAP_FAILED) return NULL; + +#ifdef MADV_NOHUGEPAGE + madvise(mem, pages_sz, MADV_NOHUGEPAGE); +#endif + #endif if (GC_PAGE_SZ > jl_page_size) // round data pointer up to the nearest gc_page_data-aligned @@ -62,6 +89,7 @@ char *jl_gc_try_alloc_pages_(int pg_cnt) JL_NOTSAFEPOINT mem = (char*)gc_page_data(mem + GC_PAGE_SZ - 1); jl_atomic_fetch_add_relaxed(&gc_heap_stats.bytes_mapped, pages_sz); jl_atomic_fetch_add_relaxed(&gc_heap_stats.bytes_resident, pages_sz); + poolmem_blocks_allocated_total++; // RAI-specific return mem; } @@ -184,7 +212,7 @@ void jl_gc_free_page(jl_gc_pagemeta_t *pg) JL_NOTSAFEPOINT } #ifdef _OS_WINDOWS_ VirtualFree(p, decommit_size, MEM_DECOMMIT); -#elif defined(MADV_FREE) +#elif 0 static int supports_madv_free = 1; if (supports_madv_free) { if (madvise(p, decommit_size, MADV_FREE) == -1) { diff --git a/src/gc-stacks.c b/src/gc-stacks.c index 9387c7fb065ec..55c4a470dd33e 100644 --- a/src/gc-stacks.c +++ b/src/gc-stacks.c @@ -72,8 +72,10 @@ static void *malloc_stack(size_t bufsz) JL_NOTSAFEPOINT munmap(stk, bufsz); return MAP_FAILED; } -# endif - +#ifdef MADV_NOHUGEPAGE + madvise(stk, bufsz, MADV_NOHUGEPAGE); +#endif +#endif jl_atomic_fetch_add_relaxed(&num_stack_mappings, 1); return stk; } diff --git a/src/gc-stock.c b/src/gc-stock.c index 55f31b26679ff..0d15da602caf0 100644 --- a/src/gc-stock.c +++ b/src/gc-stock.c @@ -3397,6 +3397,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) return recollect; } +extern int jl_heartbeat_pause(void); +extern int jl_heartbeat_resume(void); + JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) { JL_PROBE_GC_BEGIN(collection); @@ -3439,6 +3442,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) // existence of the thread in the jl_n_threads count. // // TODO: concurrently queue objects + jl_heartbeat_pause(); jl_fence(); gc_n_threads = jl_atomic_load_acquire(&jl_n_threads); gc_all_tls_states = jl_atomic_load_relaxed(&jl_all_tls_states); @@ -3470,6 +3474,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) gc_n_threads = 0; gc_all_tls_states = NULL; + jl_heartbeat_resume(); jl_safepoint_end_gc(); jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); JL_PROBE_GC_END(); @@ -3906,6 +3911,9 @@ void *jl_gc_perm_alloc_nolock(size_t sz, int zero, unsigned align, unsigned offs errno = last_errno; if (__unlikely(pool == MAP_FAILED)) return NULL; +#ifdef MADV_NOHUGEPAGE + madvise(pool, GC_PERM_POOL_SIZE, MADV_NOHUGEPAGE); +#endif #endif gc_perm_pool = (uintptr_t)pool; gc_perm_end = gc_perm_pool + GC_PERM_POOL_SIZE; diff --git a/src/gc-stock.h b/src/gc-stock.h index d478ee1366da0..41e6151605d80 100644 --- a/src/gc-stock.h +++ b/src/gc-stock.h @@ -499,6 +499,9 @@ extern uv_sem_t gc_sweep_assists_needed; extern _Atomic(int) gc_n_threads_marking; extern _Atomic(int) gc_n_threads_sweeping_pools; extern _Atomic(int) n_threads_running; +extern _Atomic(int) gc_n_threads_sweeping_stacks; +extern _Atomic(int) gc_ptls_sweep_idx; +extern _Atomic(int) gc_stack_free_idx; extern uv_barrier_t thread_init_done; void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq); void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t *fl_parent, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT; diff --git a/src/gf.c b/src/gf.c index 47db5474701ac..59604d67c584b 100644 --- a/src/gf.c +++ b/src/gf.c @@ -3890,6 +3890,15 @@ JL_DLLEXPORT void jl_compile_method_sig(jl_method_t *m, jl_value_t *types, jl_sv jl_compile_method_instance(mi, NULL, world); } +JL_DLLEXPORT int jl_is_compilable(jl_tupletype_t *types) +{ + size_t world = jl_atomic_load_acquire(&jl_world_counter); + size_t min_valid = 0; + size_t max_valid = ~(size_t)0; + jl_method_instance_t *mi = jl_get_compile_hint_specialization(types, world, &min_valid, &max_valid, 1); + return mi == NULL ? 0 : 1; +} + JL_DLLEXPORT int jl_compile_hint(jl_tupletype_t *types) { size_t world = jl_atomic_load_acquire(&jl_world_counter); diff --git a/src/init.c b/src/init.c index e11b360b5d378..a0d2719230e77 100644 --- a/src/init.c +++ b/src/init.c @@ -553,6 +553,8 @@ extern jl_mutex_t newly_inferred_mutex; extern jl_mutex_t global_roots_lock; extern jl_mutex_t profile_show_peek_cond_lock; +extern void jl_init_heartbeat(void); + static void restore_fp_env(void) { if (jl_set_zero_subnormals(0) || jl_set_default_nans(0)) { @@ -612,6 +614,11 @@ static NOINLINE void _finish_jl_init_(jl_image_buf_t sysimage, jl_ptls_t ptls, j jl_start_gc_threads(); uv_barrier_wait(&thread_init_done); + if (jl_base_module != NULL) { + // requires code in Base + jl_init_heartbeat(); + } + jl_gc_enable(1); if ((sysimage.kind != JL_IMAGE_KIND_NONE) && @@ -750,6 +757,15 @@ JL_DLLEXPORT void jl_init_(jl_image_buf_t sysimage) if (jl_options.handle_signals == JL_OPTIONS_HANDLE_SIGNALS_ON) jl_install_default_signal_handlers(); +#if (defined(_OS_LINUX_) && defined(_CPU_X86_64_)) || (defined(_OS_DARWIN_) && defined(_CPU_AARCH64_)) + if (jl_options.safe_crash_log_file != NULL) { + jl_sig_fd = open(jl_options.safe_crash_log_file, O_WRONLY | O_CREAT | O_APPEND, 0600); + if (jl_sig_fd == -1) { + jl_error("fatal error: could not open safe crash log file for writing"); + } + } +#endif + jl_gc_init(); arraylist_new(&jl_linkage_blobs, 0); diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 61420c7306de9..1a74581745972 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -271,6 +271,7 @@ XX(jl_istopmod) \ XX(jl_is_binding_deprecated) \ XX(jl_is_char_signed) \ + XX(jl_is_compilable) \ XX(jl_is_const) \ XX(jl_is_assertsbuild) \ XX(jl_is_debugbuild) \ diff --git a/src/jl_uv.c b/src/jl_uv.c index a21b05433b8c6..5d8ba54c06c4e 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -15,6 +15,7 @@ #include "errno.h" #include #include +#include #endif #include "julia.h" @@ -813,6 +814,83 @@ JL_DLLEXPORT int jl_printf(uv_stream_t *s, const char *format, ...) return c; } +STATIC_INLINE int copystp(char *dest, const char *src) +{ + char *d = stpcpy(dest, src); + return (int)(d - dest); +} + +// RAI-specific +STATIC_INLINE void write_to_safe_crash_log(char *buf) JL_NOTSAFEPOINT +{ + int buflen = strlen(buf); + // Our telemetry on SPCS expects a JSON object per line. + // We ignore write failures because there is nothing we can do. + // We'll use a 2K byte buffer: 69 bytes for JSON message decorations, + // 1 byte for the terminating NUL character, and 3 bytes for an + // ellipsis if we have to truncate the message leaves `max_b` bytes + // for the message. + const int wbuflen = 2048; + const int max_b = wbuflen - 70 - 3; + char wbuf[wbuflen]; + bzero(wbuf, wbuflen); + int wlen = 0; + + // JSON preamble (32 bytes) + wlen += copystp(&wbuf[wlen], "\n{\"level\":\"Error\", \"timestamp\":\""); + + // Timestamp (19 bytes) + struct timeval tv; + struct tm* tm_info; + gettimeofday(&tv, NULL); + tm_info = gmtime(&tv.tv_sec); + wlen += strftime(&wbuf[wlen], 42, "%Y-%m-%dT%H:%M:%S", tm_info); + sprintf(&wbuf[wlen], ".%03ld", (long)tv.tv_usec / 1000); + wlen += 4; + + // JSON preamble to message (15 bytes) + wlen += copystp(&wbuf[wlen], "\", \"message\": \""); + + // Message + // Each iteration will advance wlen by 1 or 2 + for (size_t i = 0; i < buflen; i++) { + // Truncate the message if the write buffer is full + if (wlen == max_b || wlen == max_b - 1) { + wlen += copystp(&wbuf[wlen], "..."); + break; + } + switch (buf[i]) { + case '"': + wlen += copystp(&wbuf[wlen], "\\\""); + break; + case '\b': + wlen += copystp(&wbuf[wlen], "\\b"); + break; + case '\n': + wlen += copystp(&wbuf[wlen], "\\n"); + break; + case '\r': + wlen += copystp(&wbuf[wlen], "\\r"); + break; + case '\t': + wlen += copystp(&wbuf[wlen], "\\t"); + break; + case '\\': + wlen += copystp(&wbuf[wlen], "\\\\"); + break; + default: + wbuf[wlen++] = buf[i]; + break; + } + } + // JSON completion (3 bytes) + wlen += copystp(&wbuf[wlen], "\"}\n"); + write(jl_sig_fd, wbuf, wlen); + fdatasync(jl_sig_fd); +} + +extern int jl_inside_heartbeat_thread(void); + JL_DLLEXPORT void jl_safe_printf(const char *fmt, ...) { static char buf[1000]; @@ -829,6 +907,12 @@ JL_DLLEXPORT void jl_safe_printf(const char *fmt, ...) va_end(args); buf[999] = '\0'; + // order is important here: we want to ensure that the threading infra + // has been initialized before we start trying to print to the + // safe crash log file + if (jl_sig_fd != 0 && (jl_inside_signal_handler() || jl_inside_heartbeat_thread())) { + write_to_safe_crash_log(buf); + } if (write(STDERR_FILENO, buf, strlen(buf)) < 0) { // nothing we can do; ignore the failure } diff --git a/src/jloptions.c b/src/jloptions.c index 96cce1c8d29a3..b312a6c39166e 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -155,6 +155,7 @@ JL_DLLEXPORT void jl_init_options(void) JL_TRIM_NO, // trim 0, // task_metrics -1, // timeout_for_safepoint_straggler_s + NULL, // safe_crash_log_file }; jl_options_initialized = 1; } @@ -384,6 +385,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) opt_permalloc_pkgimg, opt_trim, opt_experimental_features, + opt_safe_crash_log_file, }; static const char* const shortopts = "+vhqH:e:E:L:J:C:it:p:O:g:m:"; static const struct option longopts[] = { @@ -452,6 +454,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) { "permalloc-pkgimg",required_argument, 0, opt_permalloc_pkgimg }, { "heap-size-hint", required_argument, 0, opt_heap_size_hint }, { "trim", optional_argument, 0, opt_trim }, + { "safe-crash-log-file", required_argument, 0, opt_safe_crash_log_file }, { 0, 0, 0, 0 } }; @@ -1011,6 +1014,10 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_ON; else jl_errorf("julia: invalid argument to --task-metrics={yes|no} (%s)", optarg); + case opt_safe_crash_log_file: + jl_options.safe_crash_log_file = strdup(optarg); + if (jl_options.safe_crash_log_file == NULL) + jl_error("julia: failed to allocate memory for --safe-crash-log-file"); break; default: jl_errorf("julia: unhandled option -- %c\n" diff --git a/src/jloptions.h b/src/jloptions.h index 06e00e9309dba..f5c8f72a2cb6c 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -68,6 +68,7 @@ typedef struct { int8_t trim; int8_t task_metrics; int16_t timeout_for_safepoint_straggler_s; + const char *safe_crash_log_file; } jl_options_t; #endif diff --git a/src/julia_internal.h b/src/julia_internal.h index c9e1b0e204df6..52832b32b25e0 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -230,6 +230,7 @@ extern volatile size_t profile_bt_size_max; extern volatile size_t profile_bt_size_cur; extern volatile int profile_running; extern volatile int profile_all_tasks; +extern int heartbeat_tid; // Mostly used to ensure we skip this thread in the CPU profiler. XXX: not implemented on Windows // Ensures that we can safely read the `live_tasks`field of every TLS when profiling. // We want to avoid the case that a GC gets interleaved with `jl_profile_task` and shrinks // the `live_tasks` array while we are reading it or frees tasks that are being profiled. @@ -245,6 +246,7 @@ extern uv_mutex_t bt_data_prof_lock; #define PROFILE_STATE_THREAD_NOT_SLEEPING (1) #define PROFILE_STATE_THREAD_SLEEPING (2) #define PROFILE_STATE_WALL_TIME_PROFILING (3) +extern _Atomic(int) n_threads_running; void jl_profile_task(void); // number of cycles since power-on @@ -780,6 +782,32 @@ JL_CALLABLE(jl_f_tuple); void jl_install_default_signal_handlers(void); void restore_signals(void); void jl_install_thread_signal_handler(jl_ptls_t ptls); +extern const size_t sig_stack_size; +STATIC_INLINE int is_addr_on_sigstack(jl_ptls_t ptls, void *ptr) +{ + // One guard page for signal_stack. + return !((char*)ptr < (char*)ptls->signal_stack - jl_page_size || + (char*)ptr > (char*)ptls->signal_stack + sig_stack_size); +} +STATIC_INLINE int jl_inside_signal_handler(void) +{ +#if (defined(_OS_LINUX_) && defined(_CPU_X86_64_)) || (defined(_OS_DARWIN_) && defined(_CPU_AARCH64_)) + // Read the stack pointer + size_t sp; +#if defined(_OS_LINUX_) && defined(_CPU_X86_64_) + __asm__ __volatile__("movq %%rsp, %0" : "=r"(sp)); +#elif defined(_OS_DARWIN_) && defined(_CPU_AARCH64_) + __asm__ __volatile__("mov %0, sp" : "=r"(sp)); +#endif + // Check if the stack pointer is within the signal stack + jl_ptls_t ptls = jl_current_task->ptls; + return is_addr_on_sigstack(ptls, (void*)sp); +#else + return 0; +#endif +} +// File-descriptor for safe logging on signal handling +extern int jl_sig_fd; extern uv_loop_t *jl_io_loop; JL_DLLEXPORT void jl_uv_flush(uv_stream_t *stream); @@ -1499,9 +1527,9 @@ JL_DLLEXPORT jl_value_t *jl_backtrace_from_here(int returnsp, int skip); void jl_critical_error(int sig, int si_code, bt_context_t *context, jl_task_t *ct); JL_DLLEXPORT void jl_raise_debugger(void) JL_NOTSAFEPOINT; JL_DLLEXPORT void jl_gdblookup(void* ip) JL_NOTSAFEPOINT; -void jl_print_native_codeloc(uintptr_t ip) JL_NOTSAFEPOINT; -void jl_print_bt_entry_codeloc(jl_bt_element_t *bt_data) JL_NOTSAFEPOINT; -JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT; +void jl_print_native_codeloc(char *pre_str, uintptr_t ip) JL_NOTSAFEPOINT; +void jl_print_bt_entry_codeloc(int sig, jl_bt_element_t *bt_data) JL_NOTSAFEPOINT; +JL_DLLEXPORT void jl_print_task_backtraces(int show_done)JL_NOTSAFEPOINT ; #ifdef _OS_WINDOWS_ JL_DLLEXPORT void jl_refresh_dbg_module_list(void); #endif diff --git a/src/options.h b/src/options.h index 0715069faab32..fb2797ffd0336 100644 --- a/src/options.h +++ b/src/options.h @@ -144,6 +144,9 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// heartbeats +#define JL_HEARTBEAT_THREAD + // sanitizer defaults --------------------------------------------------------- // Automatically enable MEMDEBUG and KEEP_BODIES for the sanitizers diff --git a/src/safepoint.c b/src/safepoint.c index 970c48875d790..96da3c1a05eb1 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -172,7 +172,7 @@ void jl_gc_wait_for_the_world(jl_ptls_t* gc_all_tls_states, int gc_n_threads) size_t bt_size = jl_try_record_thread_backtrace(ptls2, ptls->bt_data, JL_MAX_BT_SIZE); // Print the backtrace of the straggler for (size_t i = 0; i < bt_size; i += jl_bt_entry_size(ptls->bt_data + i)) { - jl_print_bt_entry_codeloc(ptls->bt_data + i); + jl_print_bt_entry_codeloc(-1, ptls->bt_data + i); } } } diff --git a/src/signal-handling.c b/src/signal-handling.c index b03f0c1a430cd..269885bb5c14c 100644 --- a/src/signal-handling.c +++ b/src/signal-handling.c @@ -30,6 +30,8 @@ static const uint64_t GIGA = 1000000000ULL; // Timers to take samples at intervals JL_DLLEXPORT void jl_profile_stop_timer(void); JL_DLLEXPORT int jl_profile_start_timer(uint8_t); +// File-descriptor for safe logging on signal handling +int jl_sig_fd; /////////////////////// // Utility functions // @@ -647,7 +649,7 @@ void jl_critical_error(int sig, int si_code, bt_context_t *context, jl_task_t *c *bt_size = n = rec_backtrace_ctx(bt_data, JL_MAX_BT_SIZE, context, NULL); } for (i = 0; i < n; i += jl_bt_entry_size(bt_data + i)) { - jl_print_bt_entry_codeloc(bt_data + i); + jl_print_bt_entry_codeloc(sig, bt_data + i); } jl_gc_debug_print_status(); jl_gc_debug_critical_error(); diff --git a/src/signals-mach.c b/src/signals-mach.c index 4a670547bdfcd..12d545ee81ce6 100644 --- a/src/signals-mach.c +++ b/src/signals-mach.c @@ -824,6 +824,10 @@ void *mach_profile_listener(void *arg) for (int idx = nthreads; idx-- > 0; ) { // Stop the threads in random order. int i = randperm[idx]; + // skip heartbeat thread + if (i == heartbeat_tid) { + continue; + } jl_profile_thread_mach(i); } } diff --git a/src/signals-unix.c b/src/signals-unix.c index 2db397050420e..ed186d5b357cd 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -42,7 +42,7 @@ #endif // 8M signal stack, same as default stack size (though we barely use this) -static const size_t sig_stack_size = 8 * 1024 * 1024; +const size_t sig_stack_size = 8 * 1024 * 1024; #include "julia_assert.h" @@ -102,14 +102,6 @@ static inline uintptr_t jl_get_rsp_from_ctx(const void *_ctx) #endif } -static int is_addr_on_sigstack(jl_ptls_t ptls, void *ptr) JL_NOTSAFEPOINT -{ - // One guard page for signal_stack. - return ptls->signal_stack == NULL || - ((char*)ptr >= (char*)ptls->signal_stack - jl_page_size && - (char*)ptr <= (char*)ptls->signal_stack + (ptls->signal_stack_size ? ptls->signal_stack_size : sig_stack_size)); -} - // Modify signal context `_ctx` so that `fptr` will execute when the signal returns // The function `fptr` itself must not return. JL_NO_ASAN static void jl_call_in_ctx(jl_ptls_t ptls, void (*fptr)(void), int sig, void *_ctx) @@ -873,6 +865,10 @@ static void do_profile(void *ctx) for (int idx = nthreads; idx-- > 0; ) { // Stop the threads in the random order. int tid = randperm[idx]; + // skip heartbeat thread + if (tid == heartbeat_tid) { + return; + } // do backtrace for profiler if (!profile_running) return; @@ -1111,7 +1107,7 @@ static void *signal_listener(void *arg) jl_safe_printf("\nsignal (%d): %s\n", sig, strsignal(sig)); size_t i; for (i = 0; i < signal_bt_size; i += jl_bt_entry_size(signal_bt_data + i)) { - jl_print_bt_entry_codeloc(signal_bt_data + i); + jl_print_bt_entry_codeloc(sig, signal_bt_data + i); } } } diff --git a/src/signals-win.c b/src/signals-win.c index c8ae74f52dba4..d9c7ffd5ae769 100644 --- a/src/signals-win.c +++ b/src/signals-win.c @@ -4,7 +4,7 @@ // Note that this file is `#include`d by "signal-handling.c" #include // hidden by LEAN_AND_MEAN -static const size_t sig_stack_size = 131072; // 128k reserved for backtrace_fiber for stack overflow handling +const size_t sig_stack_size = 131072; // 128k reserved for backtrace_fiber for stack overflow handling // Copied from MINGW_FLOAT_H which may not be found due to a collision with the builtin gcc float.h // eventually we can probably integrate this into OpenLibm. @@ -333,7 +333,7 @@ LONG WINAPI jl_exception_handler(struct _EXCEPTION_POINTERS *ExceptionInfo) jl_safe_printf("UNKNOWN"); break; } jl_safe_printf(" at 0x%zx -- ", (size_t)ExceptionInfo->ExceptionRecord->ExceptionAddress); - jl_print_native_codeloc((uintptr_t)ExceptionInfo->ExceptionRecord->ExceptionAddress); + jl_print_native_codeloc("", (uintptr_t)ExceptionInfo->ExceptionRecord->ExceptionAddress); jl_critical_error(0, 0, ExceptionInfo->ContextRecord, ct); static int recursion = 0; diff --git a/src/stackwalk.c b/src/stackwalk.c index 50566b46ff45a..b04142f7cf845 100644 --- a/src/stackwalk.c +++ b/src/stackwalk.c @@ -637,22 +637,25 @@ JL_DLLEXPORT jl_value_t *jl_lookup_code_address(void *ip, int skipC) return rs; } -static void jl_safe_print_codeloc(const char* func_name, const char* file_name, +static void jl_safe_print_codeloc(const char *pre_str, + const char* func_name, const char* file_name, int line, int inlined) JL_NOTSAFEPOINT { const char *inlined_str = inlined ? " [inlined]" : ""; if (line != -1) { - jl_safe_printf("%s at %s:%d%s\n", func_name, file_name, line, inlined_str); + jl_safe_printf("%s%s at %s:%d%s\n", + pre_str, func_name, file_name, line, inlined_str); } else { - jl_safe_printf("%s at %s (unknown line)%s\n", func_name, file_name, inlined_str); + jl_safe_printf("%s%s at %s (unknown line)%s\n", + pre_str, func_name, file_name, inlined_str); } } // Print function, file and line containing native instruction pointer `ip` by // looking up debug info. Prints multiple such frames when `ip` points to // inlined code. -void jl_print_native_codeloc(uintptr_t ip) JL_NOTSAFEPOINT +void jl_print_native_codeloc(char *pre_str, uintptr_t ip) JL_NOTSAFEPOINT { // This function is not allowed to reference any TLS variables since // it can be called from an unmanaged thread on OSX. @@ -664,10 +667,11 @@ void jl_print_native_codeloc(uintptr_t ip) JL_NOTSAFEPOINT for (i = 0; i < n; i++) { jl_frame_t frame = frames[i]; if (!frame.func_name) { - jl_safe_printf("unknown function (ip: %p) at %s\n", (void*)ip, frame.file_name ? frame.file_name : "(unknown file)"); + jl_safe_printf("%sunknown function (ip: %p) at %s\n", pre_str, (void*)ip, frame.file_name ? frame.file_name : "(unknown file)"); } else { - jl_safe_print_codeloc(frame.func_name, frame.file_name, frame.line, frame.inlined); + jl_safe_print_codeloc(pre_str, frame.func_name, + frame.file_name, frame.line, frame.inlined); free(frame.func_name); } free(frame.file_name); @@ -725,7 +729,7 @@ const char *jl_debuginfo_name(jl_value_t *func) // func == module : top-level // func == NULL : macro expansion -static void jl_print_debugloc(jl_debuginfo_t *debuginfo, jl_value_t *func, size_t ip, int inlined) JL_NOTSAFEPOINT +static void jl_print_debugloc(const char *pre_str, jl_debuginfo_t *debuginfo, jl_value_t *func, size_t ip, int inlined) JL_NOTSAFEPOINT { if (!jl_is_symbol(debuginfo->def)) // this is a path or func = debuginfo->def; // this is inlined code @@ -734,26 +738,36 @@ static void jl_print_debugloc(jl_debuginfo_t *debuginfo, jl_value_t *func, size_ if (edges_idx) { jl_debuginfo_t *edge = (jl_debuginfo_t*)jl_svecref(debuginfo->edges, edges_idx - 1); assert(jl_typetagis(edge, jl_debuginfo_type)); - jl_print_debugloc(edge, NULL, stmt.pc, 1); + jl_print_debugloc(pre_str, edge, NULL, stmt.pc, 1); } intptr_t ip2 = stmt.line; if (ip2 >= 0 && ip > 0 && (jl_value_t*)debuginfo->linetable != jl_nothing) { - jl_print_debugloc(debuginfo->linetable, func, ip2, 0); + jl_print_debugloc(pre_str, debuginfo->linetable, func, ip2, 0); } else { if (ip2 < 0) // set broken debug info to ignored ip2 = 0; const char *func_name = jl_debuginfo_name(func); const char *file = jl_debuginfo_file(debuginfo); - jl_safe_print_codeloc(func_name, file, ip2, inlined); + jl_safe_print_codeloc(pre_str, func_name, file, ip2, inlined); } } // Print code location for backtrace buffer entry at *bt_entry -void jl_print_bt_entry_codeloc(jl_bt_element_t *bt_entry) JL_NOTSAFEPOINT +void jl_print_bt_entry_codeloc(int sig, jl_bt_element_t *bt_entry) JL_NOTSAFEPOINT { + char sig_str[32], pre_str[64]; + sig_str[0] = pre_str[0] = '\0'; + if (sig != -1) { + snprintf(sig_str, 32, "signal (%d) ", sig); + } + // do not call jl_threadid if there's no current task + if (jl_get_current_task()) { + snprintf(pre_str, 64, "%sthread (%d) ", sig_str, jl_threadid() + 1); + } + if (jl_bt_is_native(bt_entry)) { - jl_print_native_codeloc(bt_entry[0].uintptr); + jl_print_native_codeloc(pre_str, bt_entry[0].uintptr); } else if (jl_bt_entry_tag(bt_entry) == JL_BT_INTERP_FRAME_TAG) { size_t ip = jl_bt_entry_header(bt_entry); // zero-indexed @@ -772,7 +786,7 @@ void jl_print_bt_entry_codeloc(jl_bt_element_t *bt_entry) JL_NOTSAFEPOINT if (jl_is_code_info(code)) { jl_code_info_t *src = (jl_code_info_t*)code; // See also the debug info handling in codegen.cpp. - jl_print_debugloc(src->debuginfo, def, ip + 1, 0); + jl_print_debugloc(pre_str, src->debuginfo, def, ip + 1, 0); } else { // If we're using this function something bad has already happened; @@ -1361,7 +1375,13 @@ JL_DLLEXPORT jl_record_backtrace_result_t jl_record_backtrace(jl_task_t *t, jl_b JL_DLLEXPORT void jl_gdblookup(void* ip) { - jl_print_native_codeloc((uintptr_t)ip); + char pre_str[64]; + pre_str[0] = '\0'; + // do not call jl_threadid if there's no current task + if (jl_get_current_task()) { + snprintf(pre_str, 64, "thread (%d) ", jl_threadid() + 1); + } + jl_print_native_codeloc(pre_str, (uintptr_t)ip); } // Print backtrace for current exception in catch block @@ -1376,7 +1396,7 @@ JL_DLLEXPORT void jlbacktrace(void) JL_NOTSAFEPOINT size_t i, bt_size = jl_excstack_bt_size(s, s->top); jl_bt_element_t *bt_data = jl_excstack_bt_data(s, s->top); for (i = 0; i < bt_size; i += jl_bt_entry_size(bt_data + i)) { - jl_print_bt_entry_codeloc(bt_data + i); + jl_print_bt_entry_codeloc(-1, bt_data + i); } } @@ -1399,7 +1419,7 @@ JL_DLLEXPORT void jlbacktracet(jl_task_t *t) JL_NOTSAFEPOINT size_t bt_size = r.bt_size; size_t i; for (i = 0; i < bt_size; i += jl_bt_entry_size(bt_data + i)) { - jl_print_bt_entry_codeloc(bt_data + i); + jl_print_bt_entry_codeloc(-1, bt_data + i); } if (bt_size == 0) jl_safe_printf(" no backtrace recorded\n"); @@ -1410,11 +1430,30 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT jlbacktrace(); } -// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr +extern int jl_inside_heartbeat_thread(void); +extern int jl_heartbeat_pause(void); +extern int jl_heartbeat_resume(void); + +// Print backtraces for all live tasks, for all threads, to jl_safe_printf +// stderr. This can take a _long_ time! JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT { + // disable heartbeats to prevent heartbeat loss while running this, + // unless this is called from the heartbeat thread itself; in that + // situation, the thread is busy running this and it will not be + // updating the missed heartbeats counter + if (!jl_inside_heartbeat_thread()) { + jl_heartbeat_pause(); + } + size_t nthreads = jl_atomic_load_acquire(&jl_n_threads); jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states); + int ctid = -1; + // do not call jl_threadid if there's no current task + if (jl_get_current_task()) { + ctid = jl_threadid() + 1; + } + jl_safe_printf("thread (%d) ++++ Task backtraces\n", ctid); for (size_t i = 0; i < nthreads; i++) { jl_ptls_t ptls2 = allstates[i]; if (gc_is_collector_thread(i)) { @@ -1430,17 +1469,22 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT jl_task_t *t = ptls2->root_task; if (t != NULL) t_state = jl_atomic_load_relaxed(&t->_state); - jl_safe_printf("==== Thread %d created %zu live tasks\n", - ptls2->tid + 1, n + (t_state != JL_TASK_STATE_DONE)); + jl_safe_printf("thread (%d) ==== Thread %d created %zu live tasks\n", + ctid, ptls2->tid + 1, n + (t_state != JL_TASK_STATE_DONE)); if (show_done || t_state != JL_TASK_STATE_DONE) { - jl_safe_printf(" ---- Root task (%p)\n", ptls2->root_task); + jl_safe_printf("thread (%d) ---- Root task (%p)\n", ctid, ptls2->root_task); if (t != NULL) { - jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n", - t->sticky, t->ctx.started, t_state, + jl_safe_printf("thread (%d) (sticky: %d, started: %d, state: %d, tid: %d)\n", + ctid, t->sticky, t->ctx.started, t_state, jl_atomic_load_relaxed(&t->tid) + 1); - jlbacktracet(t); + if (t->ctx.stkbuf != NULL) { + jlbacktracet(t); + } + else { + jl_safe_printf("thread (%d) no stack\n", ctid); + } } - jl_safe_printf(" ---- End root task\n"); + jl_safe_printf("thread (%d) ---- End root task\n", ctid); } for (size_t j = 0; j < n; j++) { @@ -1450,17 +1494,24 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT int t_state = jl_atomic_load_relaxed(&t->_state); if (!show_done && t_state == JL_TASK_STATE_DONE) continue; - jl_safe_printf(" ---- Task %zu (%p)\n", j + 1, t); + jl_safe_printf("thread (%d) ---- Task %zu (%p)\n", ctid, j + 1, t); // n.b. this information might not be consistent with the stack printing after it, since it could start running or change tid, etc. - jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n", - t->sticky, t->ctx.started, t_state, + jl_safe_printf("thread (%d) (sticky: %d, started: %d, state: %d, tid: %d)\n", + ctid, t->sticky, t->ctx.started, t_state, jl_atomic_load_relaxed(&t->tid) + 1); - jlbacktracet(t); - jl_safe_printf(" ---- End task %zu\n", j + 1); + if (t->ctx.stkbuf != NULL) + jlbacktracet(t); + else + jl_safe_printf("thread (%d) no stack\n", ctid); + jl_safe_printf("thread (%d) ---- End task %zu\n", ctid, j + 1); } - jl_safe_printf("==== End thread %d\n", ptls2->tid + 1); + jl_safe_printf("thread (%d) ==== End thread %d\n", ctid, ptls2->tid + 1); + } + jl_safe_printf("thread (%d) ++++ Done\n", ctid); + + if (!jl_inside_heartbeat_thread()) { + jl_heartbeat_resume(); } - jl_safe_printf("==== Done\n"); } #ifdef __cplusplus diff --git a/src/threading.c b/src/threading.c index 9f5c18fe53555..e56aba2de51a1 100644 --- a/src/threading.c +++ b/src/threading.c @@ -1111,6 +1111,289 @@ JL_DLLEXPORT int jl_setaffinity(int16_t tid, char *mask, int cpumasksize) { return 0; // success } +// Heartbeat mechanism for Julia's task scheduler +// --- +// Start a thread that does not participate in running Julia's tasks. This +// thread simply sleeps until the heartbeat mechanism is enabled. When +// enabled, the heartbeat thread enters a loop in which it blocks waiting +// for the specified heartbeat interval. If, within that interval, +// `jl_heartbeat()` is *not* called at least once, then the thread calls +// `jl_print_task_backtraces(0)`. + +#ifdef JL_HEARTBEAT_THREAD + +#include + +volatile int heartbeat_enabled; +int heartbeat_tid; // Mostly used to ensure we skip this thread in the CPU profiler. XXX: not implemented on Windows +uv_thread_t heartbeat_uvtid; +uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread + heartbeat_off_sem; // thread -> jl_heartbeat_enable +int heartbeat_interval_s, + tasks_after_n, + reset_tasks_after_n; +int tasks_showed, n_hbs_missed, n_hbs_recvd; +_Atomic(int) heartbeats; + +JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT; +void jl_heartbeat_threadfun(void *arg); + +// start the heartbeat thread with heartbeats disabled +void jl_init_heartbeat(void) +{ + heartbeat_enabled = 0; + uv_sem_init(&heartbeat_on_sem, 0); + uv_sem_init(&heartbeat_off_sem, 0); + uv_thread_create(&heartbeat_uvtid, jl_heartbeat_threadfun, NULL); + uv_thread_detach(&heartbeat_uvtid); +} + +int jl_inside_heartbeat_thread(void) +{ + uv_thread_t curr_uvtid = uv_thread_self(); + return curr_uvtid == heartbeat_uvtid; +} + +// enable/disable heartbeats +// heartbeat_s: interval within which jl_heartbeat() must be called +// show_tasks_after_n: number of heartbeats missed before printing task backtraces +// reset_after_n: number of heartbeats after which to reset +// +// When disabling heartbeats, the heartbeat thread must wake up, +// find out that heartbeats are now disabled, and reset. For now, we +// handle this by preventing re-enabling of heartbeats until this +// completes. +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, + int reset_after_n) +{ + if (heartbeat_s <= 0) { + heartbeat_enabled = 0; + heartbeat_interval_s = tasks_after_n = reset_tasks_after_n = 0; + } + else { + // must disable before enabling + if (heartbeat_enabled) { + return -1; + } + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + jl_atomic_store_relaxed(&heartbeats, 0); + heartbeat_interval_s = heartbeat_s; + tasks_after_n = show_tasks_after_n; + reset_tasks_after_n = reset_after_n; + tasks_showed = 0; + n_hbs_missed = 0; + n_hbs_recvd = 0; + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + } + return 0; +} + +// temporarily pause the heartbeat thread +JL_DLLEXPORT int jl_heartbeat_pause(void) +{ + if (!heartbeat_enabled) { + return -1; + } + heartbeat_enabled = 0; + return 0; +} + +// resume the paused heartbeat thread +JL_DLLEXPORT int jl_heartbeat_resume(void) +{ + // cannot resume if the heartbeat thread is already running + if (heartbeat_enabled) { + return -1; + } + + // cannot resume if we weren't paused (disabled != paused) + if (heartbeat_interval_s == 0) { + return -1; + } + + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + // reset state as we've been paused + n_hbs_missed = 0; + n_hbs_recvd = 0; + tasks_showed = 0; + + // resume + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + return 0; +} + +// heartbeat +JL_DLLEXPORT void jl_heartbeat(void) +{ + jl_atomic_fetch_add(&heartbeats, 1); +} + +// sleep the thread for the specified interval +void sleep_for(int secs, int nsecs) +{ + struct timespec rqtp, rmtp; + rqtp.tv_sec = secs; + rqtp.tv_nsec = nsecs; + rmtp.tv_sec = 0; + rmtp.tv_nsec = 0; + for (; ;) { + // this suspends the thread so we aren't using CPU + if (nanosleep(&rqtp, &rmtp) == 0) { + return; + } + // TODO: else if (errno == EINTR) + // this could be SIGTERM and we should shutdown but how to find out? + rqtp = rmtp; + } +} + +// check for heartbeats and maybe report loss +uint8_t check_heartbeats(uint8_t gc_state) +{ + int hb = jl_atomic_exchange(&heartbeats, 0); + + if (hb <= 0) { + // we didn't get a heartbeat + n_hbs_recvd = 0; + n_hbs_missed++; + + // if we've printed task backtraces already, do nothing + if (!tasks_showed) { + // otherwise, at least show this message + jl_safe_printf("==== heartbeat loss (%ds) ====\n", + n_hbs_missed * heartbeat_interval_s); + // if we've missed enough heartbeats, print task backtraces + if (n_hbs_missed >= tasks_after_n) { + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // exit GC-safe region to report then re-enter + jl_gc_safe_leave(ptls, gc_state); + jl_print_task_backtraces(0); + gc_state = jl_gc_safe_enter(ptls); + + // we printed task backtraces + tasks_showed = 1; + } + } + } + else { + // got a heartbeat + n_hbs_recvd++; + // if we'd printed task backtraces, check for reset + if (tasks_showed && n_hbs_recvd >= reset_tasks_after_n) { + tasks_showed = 0; + jl_safe_printf("==== heartbeats recovered (lost for %ds) ====\n", + n_hbs_missed * heartbeat_interval_s); + } + n_hbs_missed = 0; + } + + return gc_state; +} + +// heartbeat thread function +void jl_heartbeat_threadfun(void *arg) +{ + int s = 59, ns = 1e9 - 1, rs; + uint64_t t0, tchb; + + // We need a TLS because backtraces are accumulated into ptls->bt_size + // and ptls->bt_data, so we need to call jl_adopt_thread(). + jl_adopt_thread(); + (void)jl_atomic_fetch_add_relaxed(&n_threads_running, -1); + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + heartbeat_tid = ptls->tid; + + // Don't hold up GC, this thread doesn't participate. + uint8_t gc_state = jl_gc_safe_enter(ptls); + + for (;;) { + if (!heartbeat_enabled) { + // post the off semaphore to indicate we're ready to enable + uv_sem_post(&heartbeat_off_sem); + + // sleep the thread here; this semaphore is posted in + // jl_heartbeat_enable() or jl_heartbeat_resume() + uv_sem_wait(&heartbeat_on_sem); + + // Set the sleep duration. + s = heartbeat_interval_s - 1; + ns = 1e9 - 1; + continue; + } + + // heartbeat is enabled; sleep, waiting for the desired interval + sleep_for(s, ns); + + // if heartbeats were turned off/paused while we were sleeping, reset + if (!heartbeat_enabled) { + continue; + } + + // check if any heartbeats have happened, report as appropriate + t0 = jl_hrtime(); + gc_state = check_heartbeats(gc_state); + tchb = jl_hrtime() - t0; + + // adjust the next sleep duration based on how long the heartbeat + // check took, but if it took too long then use the normal duration + rs = 1; + while (tchb > 1e9) { + rs++; + tchb -= 1e9; + } + if (rs < heartbeat_interval_s) { + s = heartbeat_interval_s - rs; + } + ns = 1e9 - tchb; + } +} + +#else // !JL_HEARTBEAT_THREAD + +void jl_init_heartbeat(void) +{ +} + +int jl_inside_heartbeat_thread(void) +{ + return 0; +} + +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, + int reset_after_n) +{ + return -1; +} + +JL_DLLEXPORT int jl_heartbeat_pause(void) +{ + return -1; +} + +JL_DLLEXPORT int jl_heartbeat_resume(void) +{ + return -1; +} + +JL_DLLEXPORT void jl_heartbeat(void) +{ +} + +#endif // JL_HEARTBEAT_THREAD + #ifdef __cplusplus } #endif