Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/docs/usage/sdk/how-to-use.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ If using Node.js you can import the library with:
const { StreamrClient } = require('@streamr/sdk');
```

### Cleaning up

After the StreamrClient is no longer needed or the process is shutting down it is very important to destroy the StreamrClient:

```js
const client = new StreamrClient()
await client.destroy()
```

### Environments and frameworks

#### Node.js
Expand Down
26 changes: 25 additions & 1 deletion packages/node/bin/streamr-node.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env node
import { program } from 'commander'
import pkg from '../package.json'

import { Logger } from '@streamr/utils'
import { createBroker } from '../src/broker'
import { readConfigAndMigrateIfNeeded } from '../src/config/migration'
import { overrideConfigToEnvVarsIfGiven } from '../src/config/config'

const logger = new Logger(module)

program
.version(pkg.version)
.name('streamr-node')
Expand All @@ -17,6 +19,28 @@ program
const config = readConfigAndMigrateIfNeeded(configFile)
overrideConfigToEnvVarsIfGiven(config)
const broker = await createBroker(config)

// Set up graceful shutdown handlers
const shutdown = async (exitCode: number) => {
await broker.stop()
process.exit(exitCode)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing guard allows concurrent broker shutdown attempts

The shutdown function lacks a guard against concurrent invocations. If multiple signals arrive rapidly (e.g., quick successive Ctrl+C presses), the function can be triggered multiple times before process.exit() is called. This leads to concurrent broker.stop() calls, which can cause stopServer and plugin stop() methods to be invoked concurrently. The old NetworkStack.stop() had a stopped flag guard that prevented this, but the new broker-level stop() has no such protection.

Additional Locations (1)

Fix in Cursor Fix in Web


const exitEvents = ['SIGINT', 'SIGTERM', 'SIGUSR1', 'SIGUSR2']
exitEvents.forEach((event) => {
process.on(event, () => shutdown(0))
})

process.on('uncaughtException', (err) => {
logger.fatal('Encountered uncaughtException', { err })
shutdown(1)
})

process.on('unhandledRejection', (err) => {
logger.fatal('Encountered unhandledRejection', { err })
shutdown(1)
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unhandled rejection in error handlers may cause recursion

The shutdown async function is called without awaiting and without a .catch() handler in both the uncaughtException and unhandledRejection listeners. If broker.stop() throws during shutdown, the resulting promise rejection goes unhandled. This is particularly problematic in the unhandledRejection handler: if shutdown(1) fails, it creates another unhandled rejection, which triggers the same handler again, leading to infinite recursion until the process crashes or exceeds resource limits.

Fix in Cursor Fix in Web


if (!program.opts().test) {
await broker.start()
} else {
Expand Down
10 changes: 0 additions & 10 deletions packages/node/src/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,3 @@ export const createBroker = async (configWithoutDefaults: Config): Promise<Broke
}
}
}

process.on('uncaughtException', (err) => {
logger.fatal('Encountered uncaughtException', { err })
process.exit(1)
})

process.on('unhandledRejection', (err) => {
logger.fatal('Encountered unhandledRejection', { err })
process.exit(1)
})
7 changes: 7 additions & 0 deletions packages/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ streamr.resend(streamId, { last: 10 }, (msgs) => {
});
```

### Clean up
After the `StreamrClient` is no longer used or the process is shutting down it is very important to call `destroy()` on the `StreamrClient`. This ensures that the network node of the client will be shutdown gracefully.

```js
await streamr.destroy()
```

___

**This Readme only scratches the surface of what's possible - be sure to [checkout our documentation](https://docs.streamr.network) for the full usage instructions.**
28 changes: 0 additions & 28 deletions packages/trackerless-network/src/NetworkStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
toNodeId
} from '@streamr/dht'
import { Logger, MetricsContext, StreamID, StreamPartID, toStreamPartID, until } from '@streamr/utils'
import pull from 'lodash/pull'
import { version as applicationVersion } from '../package.json'
import { ContentDeliveryManager, ContentDeliveryManagerOptions, StreamPartDeliveryOptions } from './ContentDeliveryManager'
import { ControlLayerNode } from './control-layer/ControlLayerNode'
Expand All @@ -25,31 +24,6 @@ export interface NetworkOptions {

const logger = new Logger(module)

const instances: NetworkStack[] = []
const stopInstances = async () => {
// make a clone so that it is ok for each instance.stop() to remove itself from the list (at line 139)
// while the map function is iterating the list
const clonedInstances = [...instances]
await Promise.all(clonedInstances.map((instance) => instance.stop()))
}
const EXIT_EVENTS = [`exit`, `SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `unhandledRejection`, `SIGTERM`]
EXIT_EVENTS.forEach((event) => {
process.on(event, async (eventArg) => {
const isError = (event === 'uncaughtException') || (event === 'unhandledRejection')
if (isError) {
logger.error(`exit event: ${event}`, eventArg)
}
await stopInstances()
process.exit(isError ? 1 : 0)
})
})
declare let window: any
if (typeof window === 'object') {
window.addEventListener('unload', async () => {
await stopInstances()
})
}

export class NetworkStack {

private controlLayerNode?: ControlLayerNode
Expand All @@ -72,7 +46,6 @@ export class NetworkStack {
...options.networkNode,
metricsContext: this.metricsContext
})
instances.push(this)
}

async joinStreamPart(
Expand Down Expand Up @@ -190,7 +163,6 @@ export class NetworkStack {
async stop(): Promise<void> {
if (!this.stopped) {
this.stopped = true
pull(instances, this)
await this.contentDeliveryManager!.destroy()
await this.controlLayerNode!.stop()
this.contentDeliveryManager = undefined
Expand Down