Skip to content

Commit 4c69a66

Browse files
committed
feat: Allow passing state from tracked threads
1 parent f4d8eeb commit 4c69a66

File tree

8 files changed

+154
-50
lines changed

8 files changed

+154
-50
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ build/
33
lib/
44
/*.tgz
55
test/yarn.lock
6+
test/package.json

README.md

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ registerThread();
1616
Watchdog thread:
1717

1818
```ts
19-
const { captureStackTrace } = require("@sentry-internal/node-native-stacktrace");
19+
const {
20+
captureStackTrace,
21+
} = require("@sentry-internal/node-native-stacktrace");
2022

2123
const stacks = captureStackTrace();
2224
console.log(stacks);
@@ -87,15 +89,20 @@ In the main or worker threads if you call `registerThread()` regularly, times
8789
are recorded.
8890

8991
```ts
90-
const { registerThread } = require("@sentry-internal/node-native-stacktrace");
92+
const {
93+
registerThread,
94+
threadPoll,
95+
} = require("@sentry-internal/node-native-stacktrace");
96+
97+
registerThread();
9198

9299
setInterval(() => {
93-
registerThread();
100+
threadPoll({ optional_state: "some_value" });
94101
}, 200);
95102
```
96103

97104
In the watchdog thread you can call `getThreadsLastSeen()` to get how long it's
98-
been in milliseconds since each thread registered.
105+
been in milliseconds since each thread polled.
99106

100107
If any thread has exceeded a threshold, you can call `captureStackTrace()` to
101108
get the stack traces for all threads.
@@ -111,11 +118,13 @@ const THRESHOLD = 1000; // 1 second
111118
setInterval(() => {
112119
for (const [thread, time] in Object.entries(getThreadsLastSeen())) {
113120
if (time > THRESHOLD) {
114-
const stacks = captureStackTrace();
115-
const blockedThread = stacks[thread];
121+
const threads = captureStackTrace();
122+
const blockedThread = threads[thread];
123+
const { frames, state } = blockedThread;
116124
console.log(
117125
`Thread '${thread}' blocked more than ${THRESHOLD}ms`,
118-
blockedThread,
126+
frames,
127+
state,
119128
);
120129
}
121130
}

module.cc

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <future>
33
#include <mutex>
44
#include <node.h>
5+
#include <sstream>
56

67
using namespace v8;
78
using namespace node;
@@ -15,6 +16,8 @@ struct ThreadInfo {
1516
std::string thread_name;
1617
// Last time this thread was seen in milliseconds since epoch
1718
milliseconds last_seen;
19+
// Some JSON serialized state for the thread
20+
std::string state;
1821
};
1922

2023
static std::mutex threads_mutex;
@@ -32,6 +35,12 @@ struct JsStackFrame {
3235
// Type alias for a vector of JsStackFrame
3336
using JsStackTrace = std::vector<JsStackFrame>;
3437

38+
struct ThreadResult {
39+
std::string thread_name;
40+
std::string state;
41+
JsStackTrace stack_frames;
42+
};
43+
3544
// Function to be called when an isolate's execution is interrupted
3645
static void ExecutionInterrupted(Isolate *isolate, void *data) {
3746
auto promise = static_cast<std::promise<JsStackTrace> *>(data);
@@ -91,7 +100,6 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
91100
auto capture_from_isolate = args.GetIsolate();
92101
auto current_context = capture_from_isolate->GetCurrentContext();
93102

94-
using ThreadResult = std::tuple<std::string, JsStackTrace>;
95103
std::vector<std::future<ThreadResult>> futures;
96104

97105
{
@@ -100,35 +108,39 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
100108
if (thread_isolate == capture_from_isolate)
101109
continue;
102110
auto thread_name = thread_info.thread_name;
111+
auto state = thread_info.state;
103112

104113
futures.emplace_back(std::async(
105114
std::launch::async,
106-
[thread_name](Isolate *isolate) -> ThreadResult {
107-
return std::make_tuple(thread_name, CaptureStackTrace(isolate));
115+
[thread_name, state](Isolate *isolate) -> ThreadResult {
116+
return {thread_name, state,
117+
.stack_frames = CaptureStackTrace(isolate)};
108118
},
109119
thread_isolate));
110120
}
111121
}
112122

113-
Local<Object> result = Object::New(capture_from_isolate);
123+
Local<Object> output = Object::New(capture_from_isolate);
114124

115125
for (auto &future : futures) {
116-
auto [thread_name, frames] = future.get();
117-
auto key = String::NewFromUtf8(capture_from_isolate, thread_name.c_str(),
118-
NewStringType::kNormal)
119-
.ToLocalChecked();
120-
121-
Local<Array> jsFrames = Array::New(capture_from_isolate, frames.size());
122-
for (size_t i = 0; i < frames.size(); ++i) {
123-
const auto &f = frames[i];
126+
auto result = future.get();
127+
auto key =
128+
String::NewFromUtf8(capture_from_isolate, result.thread_name.c_str(),
129+
NewStringType::kNormal)
130+
.ToLocalChecked();
131+
132+
Local<Array> jsFrames =
133+
Array::New(capture_from_isolate, result.stack_frames.size());
134+
for (size_t i = 0; i < result.stack_frames.size(); ++i) {
135+
const auto &frame = result.stack_frames[i];
124136
Local<Object> frameObj = Object::New(capture_from_isolate);
125137
frameObj
126138
->Set(current_context,
127139
String::NewFromUtf8(capture_from_isolate, "function",
128140
NewStringType::kInternalized)
129141
.ToLocalChecked(),
130142
String::NewFromUtf8(capture_from_isolate,
131-
f.function_name.c_str(),
143+
frame.function_name.c_str(),
132144
NewStringType::kNormal)
133145
.ToLocalChecked())
134146
.Check();
@@ -137,7 +149,8 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
137149
String::NewFromUtf8(capture_from_isolate, "filename",
138150
NewStringType::kInternalized)
139151
.ToLocalChecked(),
140-
String::NewFromUtf8(capture_from_isolate, f.filename.c_str(),
152+
String::NewFromUtf8(capture_from_isolate,
153+
frame.filename.c_str(),
141154
NewStringType::kNormal)
142155
.ToLocalChecked())
143156
.Check();
@@ -146,23 +159,52 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
146159
String::NewFromUtf8(capture_from_isolate, "lineno",
147160
NewStringType::kInternalized)
148161
.ToLocalChecked(),
149-
Integer::New(capture_from_isolate, f.lineno))
162+
Integer::New(capture_from_isolate, frame.lineno))
150163
.Check();
151164
frameObj
152165
->Set(current_context,
153166
String::NewFromUtf8(capture_from_isolate, "colno",
154167
NewStringType::kInternalized)
155168
.ToLocalChecked(),
156-
Integer::New(capture_from_isolate, f.colno))
169+
Integer::New(capture_from_isolate, frame.colno))
157170
.Check();
158171
jsFrames->Set(current_context, static_cast<uint32_t>(i), frameObj)
159172
.Check();
160173
}
161174

162-
result->Set(current_context, key, jsFrames).Check();
175+
// Create a thread object with a 'frames' property and optional 'state'
176+
Local<Object> threadObj = Object::New(capture_from_isolate);
177+
threadObj
178+
->Set(current_context,
179+
String::NewFromUtf8(capture_from_isolate, "frames",
180+
NewStringType::kInternalized)
181+
.ToLocalChecked(),
182+
jsFrames)
183+
.Check();
184+
185+
if (!result.state.empty()) {
186+
v8::MaybeLocal<v8::String> stateStr = v8::String::NewFromUtf8(
187+
capture_from_isolate, result.state.c_str(), NewStringType::kNormal);
188+
if (!stateStr.IsEmpty()) {
189+
v8::MaybeLocal<v8::Value> maybeStateVal =
190+
v8::JSON::Parse(current_context, stateStr.ToLocalChecked());
191+
v8::Local<v8::Value> stateVal;
192+
if (maybeStateVal.ToLocal(&stateVal)) {
193+
threadObj
194+
->Set(current_context,
195+
String::NewFromUtf8(capture_from_isolate, "state",
196+
NewStringType::kInternalized)
197+
.ToLocalChecked(),
198+
stateVal)
199+
.Check();
200+
}
201+
}
202+
}
203+
204+
output->Set(current_context, key, threadObj).Check();
163205
}
164206

165-
args.GetReturnValue().Set(result);
207+
args.GetReturnValue().Set(output);
166208
}
167209

168210
// Cleanup function to remove the thread from the map when the isolate is
@@ -179,9 +221,9 @@ void RegisterThread(const FunctionCallbackInfo<Value> &args) {
179221

180222
if (args.Length() != 1 || !args[0]->IsString()) {
181223
isolate->ThrowException(Exception::Error(
182-
String::NewFromUtf8(
183-
isolate, "registerThread(name) requires a single name argument",
184-
NewStringType::kInternalized)
224+
String::NewFromUtf8(isolate,
225+
"threadStart(name) requires a single name argument",
226+
NewStringType::kInternalized)
185227
.ToLocalChecked()));
186228

187229
return;
@@ -194,13 +236,39 @@ void RegisterThread(const FunctionCallbackInfo<Value> &args) {
194236
std::lock_guard<std::mutex> lock(threads_mutex);
195237
auto found = threads.find(isolate);
196238
if (found == threads.end()) {
197-
threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero()});
239+
threads.emplace(
240+
isolate, ThreadInfo{thread_name, milliseconds::zero(), .state = ""});
198241
// Register a cleanup hook to remove this thread when the isolate is
199242
// destroyed
200243
node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate);
244+
}
245+
}
246+
}
247+
248+
// Function to track a thread and set its state
249+
void ThreadPoll(const FunctionCallbackInfo<Value> &args) {
250+
auto isolate = args.GetIsolate();
251+
auto context = isolate->GetCurrentContext();
252+
253+
std::string state_str;
254+
if (args.Length() == 1 && args[0]->IsValue()) {
255+
MaybeLocal<String> maybe_json = v8::JSON::Stringify(context, args[0]);
256+
if (!maybe_json.IsEmpty()) {
257+
v8::String::Utf8Value utf8_state(isolate, maybe_json.ToLocalChecked());
258+
state_str = *utf8_state ? *utf8_state : "";
201259
} else {
260+
state_str = "";
261+
}
262+
} else {
263+
state_str = "";
264+
}
265+
266+
{
267+
std::lock_guard<std::mutex> lock(threads_mutex);
268+
auto found = threads.find(isolate);
269+
if (found != threads.end()) {
202270
auto &thread_info = found->second;
203-
thread_info.thread_name = thread_name;
271+
thread_info.state = state_str;
204272
thread_info.last_seen =
205273
duration_cast<milliseconds>(system_clock::now().time_since_epoch());
206274
}
@@ -257,6 +325,16 @@ NODE_MODULE_INITIALIZER(Local<Object> exports, Local<Value> module,
257325
.ToLocalChecked())
258326
.Check();
259327

328+
exports
329+
->Set(context,
330+
String::NewFromUtf8(isolate, "threadPoll",
331+
NewStringType::kInternalized)
332+
.ToLocalChecked(),
333+
FunctionTemplate::New(isolate, ThreadPoll)
334+
->GetFunction(context)
335+
.ToLocalChecked())
336+
.Check();
337+
260338
exports
261339
->Set(context,
262340
String::NewFromUtf8(isolate, "getThreadsLastSeen",

src/index.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ const arch = process.env['BUILD_ARCH'] || _arch();
1111
const abi = getAbi(versions.node, 'node');
1212
const identifier = [platform, arch, stdlib, abi].filter(c => c !== undefined && c !== null).join('-');
1313

14+
type Thread<S = unknown> = {
15+
frames: StackFrame[];
16+
state?: S
17+
}
18+
1419
type StackFrame = {
1520
function: string;
1621
filename: string;
@@ -20,7 +25,8 @@ type StackFrame = {
2025

2126
interface Native {
2227
registerThread(threadName: string): void;
23-
captureStackTrace(): Record<string, StackFrame[]>;
28+
threadPoll(state?: object): void;
29+
captureStackTrace<S = unknown>(): Record<string, Thread<S>>;
2430
getThreadsLastSeen(): Record<string, number>;
2531
}
2632

@@ -177,11 +183,24 @@ export function registerThread(threadName: string = String(threadId)): void {
177183
native.registerThread(threadName);
178184
}
179185

186+
/**
187+
* Tells the native module that the thread is still running and updates the state.
188+
*
189+
* @param state Optional state to pass to the native module.
190+
*/
191+
export function threadPoll(state?: object): void {
192+
if (typeof state === 'object') {
193+
native.threadPoll(state);
194+
} else {
195+
native.threadPoll();
196+
}
197+
}
198+
180199
/**
181200
* Captures stack traces for all registered threads.
182201
*/
183-
export function captureStackTrace(): Record<string, StackFrame[]> {
184-
return native.captureStackTrace();
202+
export function captureStackTrace<S = unknown>(): Record<string, Thread<S>> {
203+
return native.captureStackTrace<S>();
185204
}
186205

187206
/**

test/e2e.test.mjs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
1313

1414
const stacks = JSON.parse(result.stdout.toString());
1515

16-
expect(stacks['0']).toEqual(expect.arrayContaining([
16+
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
1717
{
1818
function: 'pbkdf2Sync',
1919
filename: expect.any(String),
@@ -34,7 +34,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
3434
},
3535
]));
3636

37-
expect(stacks['2']).toEqual(expect.arrayContaining([
37+
expect(stacks['2'].frames).toEqual(expect.arrayContaining([
3838
{
3939
function: 'pbkdf2Sync',
4040
filename: expect.any(String),
@@ -64,7 +64,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
6464

6565
const stacks = JSON.parse(result.stdout.toString());
6666

67-
expect(stacks['0']).toEqual(expect.arrayContaining([
67+
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
6868
{
6969
function: 'pbkdf2Sync',
7070
filename: expect.any(String),
@@ -85,6 +85,8 @@ describe('e2e Tests', { timeout: 20000 }, () => {
8585
},
8686
]));
8787

88-
expect(stacks['2'].length).toEqual(1);
88+
expect(stacks['0'].state).toEqual({ some_property: 'some_value' });
89+
90+
expect(stacks['2'].frames.length).toEqual(1);
8991
});
9092
});

test/package.json

Lines changed: 0 additions & 7 deletions
This file was deleted.

test/stalled.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
const { Worker } = require('node:worker_threads');
22
const { longWork } = require('./long-work.js');
3-
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
3+
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');
4+
5+
registerThread();
46

57
setInterval(() => {
6-
registerThread();
8+
threadPoll({ some_property: 'some_value' });
79
}, 200).unref();
810

911
const watchdog = new Worker('./test/stalled-watchdog.js');

0 commit comments

Comments
 (0)