diff --git a/libs/providers/flagd/src/lib/configuration.spec.ts b/libs/providers/flagd/src/lib/configuration.spec.ts index 57b0936f1..a075677ca 100644 --- a/libs/providers/flagd/src/lib/configuration.spec.ts +++ b/libs/providers/flagd/src/lib/configuration.spec.ts @@ -25,6 +25,7 @@ describe('Configuration', () => { resolverType: 'rpc', selector: '', deadlineMs: 500, + streamDeadlineMs: 600000, contextEnricher: expect.any(Function), }); }); @@ -41,6 +42,7 @@ describe('Configuration', () => { const selector = 'app=weather'; const offlineFlagSourcePath = '/tmp/flags.json'; const defaultAuthority = 'test-authority'; + const streamDeadlineMs = 700000; process.env['FLAGD_HOST'] = host; process.env['FLAGD_PORT'] = `${port}`; @@ -53,6 +55,7 @@ describe('Configuration', () => { process.env['FLAGD_RESOLVER'] = `${resolverType}`; process.env['FLAGD_OFFLINE_FLAG_SOURCE_PATH'] = offlineFlagSourcePath; process.env['FLAGD_DEFAULT_AUTHORITY'] = defaultAuthority; + process.env['FLAGD_STREAM_DEADLINE_MS'] = `${streamDeadlineMs}`; expect(getConfig()).toEqual( expect.objectContaining({ @@ -68,6 +71,7 @@ describe('Configuration', () => { offlineFlagSourcePath, defaultAuthority, deadlineMs: 500, + streamDeadlineMs, }), ); }); @@ -113,6 +117,7 @@ describe('Configuration', () => { selector: '', defaultAuthority: '', deadlineMs: 500, + streamDeadlineMs: 600000, contextEnricher: contextEnricher, }; diff --git a/libs/providers/flagd/src/lib/configuration.ts b/libs/providers/flagd/src/lib/configuration.ts index c1804a46f..087985ab3 100644 --- a/libs/providers/flagd/src/lib/configuration.ts +++ b/libs/providers/flagd/src/lib/configuration.ts @@ -104,10 +104,20 @@ interface FlagdConfig extends Config { contextEnricher: (syncContext: EvaluationContext | null) => EvaluationContext; } -export type FlagdProviderOptions = Partial; +export interface FlagdGrpcConfig extends Config { + /** + * The deadline for streaming connections. + * + * @default 600000 + */ + streamDeadlineMs: number; +} + +export type FlagdProviderOptions = Partial; -const DEFAULT_CONFIG: Omit = { +const DEFAULT_CONFIG: Omit = { deadlineMs: 500, + streamDeadlineMs: 600000, host: 'localhost', tls: false, selector: '', @@ -125,6 +135,7 @@ enum ENV_VAR { FLAGD_PORT = 'FLAGD_PORT', FLAGD_SYNC_PORT = 'FLAGD_SYNC_PORT', FLAGD_DEADLINE_MS = 'FLAGD_DEADLINE_MS', + FLAGD_STREAM_DEADLINE_MS = 'FLAGD_STREAM_DEADLINE_MS', FLAGD_TLS = 'FLAGD_TLS', FLAGD_SOCKET_PATH = 'FLAGD_SOCKET_PATH', FLAGD_SERVER_CERT_PATH = 'FLAGD_SERVER_CERT_PATH', @@ -144,7 +155,7 @@ function checkEnvVarResolverType() { ); } -const getEnvVarConfig = (): Partial => { +const getEnvVarConfig = (): Partial => { let provider = undefined; if ( process.env[ENV_VAR.FLAGD_RESOLVER] && @@ -167,6 +178,9 @@ const getEnvVarConfig = (): Partial => { ...(Number(process.env[ENV_VAR.FLAGD_DEADLINE_MS]) && { deadlineMs: Number(process.env[ENV_VAR.FLAGD_DEADLINE_MS]), }), + ...(Number(process.env[ENV_VAR.FLAGD_STREAM_DEADLINE_MS]) && { + streamDeadlineMs: Number(process.env[ENV_VAR.FLAGD_STREAM_DEADLINE_MS]), + }), ...(process.env[ENV_VAR.FLAGD_TLS] && { tls: process.env[ENV_VAR.FLAGD_TLS]?.toLowerCase() === 'true', }), @@ -197,7 +211,7 @@ const getEnvVarConfig = (): Partial => { }; }; -export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig { +export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig & FlagdGrpcConfig { const envVarConfig = getEnvVarConfig(); const defaultConfig = options.resolverType == 'in-process' || envVarConfig.resolverType == 'in-process' @@ -207,5 +221,6 @@ export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig { ...defaultConfig, ...envVarConfig, ...options, + streamDeadlineMs: options.streamDeadlineMs ?? envVarConfig.streamDeadlineMs ?? DEFAULT_CONFIG.streamDeadlineMs, }; } diff --git a/libs/providers/flagd/src/lib/flagd-provider.spec.ts b/libs/providers/flagd/src/lib/flagd-provider.spec.ts index 07ada6970..ee0250250 100644 --- a/libs/providers/flagd/src/lib/flagd-provider.spec.ts +++ b/libs/providers/flagd/src/lib/flagd-provider.spec.ts @@ -144,7 +144,10 @@ describe(FlagdProvider.name, () => { new FlagdProvider( undefined, undefined, - new GRPCService({ deadlineMs: 100, host: '', port: 123, tls: false }, basicServiceClientMock), + new GRPCService( + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false }, + basicServiceClientMock, + ), ), ); client = OpenFeature.getClient('basic test'); @@ -302,7 +305,7 @@ describe(FlagdProvider.name, () => { undefined, undefined, new GRPCService( - { deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' }, + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock, ), ) @@ -330,7 +333,7 @@ describe(FlagdProvider.name, () => { undefined, undefined, new GRPCService( - { deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' }, + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock, ), ), @@ -382,7 +385,7 @@ describe(FlagdProvider.name, () => { undefined, undefined, new GRPCService( - { deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' }, + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock, ), ), @@ -417,7 +420,7 @@ describe(FlagdProvider.name, () => { undefined, undefined, new GRPCService( - { deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' }, + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock, ), ), @@ -509,7 +512,7 @@ describe(FlagdProvider.name, () => { undefined, undefined, new GRPCService( - { deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' }, + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock, ), ); @@ -589,7 +592,10 @@ describe(FlagdProvider.name, () => { new FlagdProvider( undefined, undefined, - new GRPCService({ deadlineMs: 100, host: '', port: 123, tls: false }, errServiceClientMock), + new GRPCService( + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false }, + errServiceClientMock, + ), ), ); client = OpenFeature.getClient('errors test'); @@ -676,7 +682,10 @@ describe(FlagdProvider.name, () => { new FlagdProvider( undefined, undefined, - new GRPCService({ deadlineMs: 100, host: '', port: 123, tls: false }, errServiceClientMock), + new GRPCService( + { deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false }, + errServiceClientMock, + ), ), ); }); diff --git a/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts b/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts index 391b0dcaa..7cf7b601f 100644 --- a/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts +++ b/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts @@ -26,7 +26,7 @@ import type { ResolveStringResponse, } from '../../../proto/ts/flagd/evaluation/v1/evaluation'; import { ServiceClient } from '../../../proto/ts/flagd/evaluation/v1/evaluation'; -import type { Config } from '../../configuration'; +import type { FlagdGrpcConfig } from '../../configuration'; import { DEFAULT_MAX_CACHE_SIZE, EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from '../../constants'; import { FlagdProvider } from '../../flagd-provider'; import type { Service } from '../service'; @@ -69,6 +69,7 @@ export class GRPCService implements Service { private _cacheEnabled = false; private _eventStream: ClientReadableStream | undefined = undefined; private _deadline: number; + private _streamDeadline: number; private get _cacheActive() { // the cache is "active" (able to be used) if the config enabled it, AND the gRPC stream is live @@ -76,7 +77,7 @@ export class GRPCService implements Service { } constructor( - config: Config, + config: FlagdGrpcConfig, client?: ServiceClient, private logger?: Logger, ) { @@ -94,6 +95,7 @@ export class GRPCService implements Service { ? client : new ServiceClient(socketPath ? `unix://${socketPath}` : `${host}:${port}`, channelCredentials, clientOptions); this._deadline = config.deadlineMs; + this._streamDeadline = config.streamDeadlineMs; if (config.cache === 'lru') { this._cacheEnabled = true; @@ -211,7 +213,7 @@ export class GRPCService implements Service { disconnectCallback: (message: string) => void, ) { const channel = this._client.getChannel(); - channel.watchConnectivityState(channel.getConnectivityState(true), Infinity, () => { + channel.watchConnectivityState(channel.getConnectivityState(true), this._streamDeadline, () => { this.listen(reconnectCallback, changedCallback, disconnectCallback); }); }