Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libs/providers/flagd/src/lib/configuration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe('Configuration', () => {
resolverType: 'rpc',
selector: '',
deadlineMs: 500,
streamDeadlineMs: 600000,
contextEnricher: expect.any(Function),
});
});
Expand All @@ -41,6 +42,7 @@ describe('Configuration', () => {
const selector = 'app=weather';
const offlineFlagSourcePath = '/tmp/flags.json';
const defaultAuthority = 'test-authority';
const streamDeadlineMs = 700000;

process.env['FLAGD_HOST'] = host;
process.env['FLAGD_PORT'] = `${port}`;
Expand All @@ -53,6 +55,7 @@ describe('Configuration', () => {
process.env['FLAGD_RESOLVER'] = `${resolverType}`;
process.env['FLAGD_OFFLINE_FLAG_SOURCE_PATH'] = offlineFlagSourcePath;
process.env['FLAGD_DEFAULT_AUTHORITY'] = defaultAuthority;
process.env['FLAGD_STREAM_DEADLINE_MS'] = `${streamDeadlineMs}`;

expect(getConfig()).toEqual(
expect.objectContaining({
Expand All @@ -68,6 +71,7 @@ describe('Configuration', () => {
offlineFlagSourcePath,
defaultAuthority,
deadlineMs: 500,
streamDeadlineMs,
}),
);
});
Expand Down Expand Up @@ -113,6 +117,7 @@ describe('Configuration', () => {
selector: '',
defaultAuthority: '',
deadlineMs: 500,
streamDeadlineMs: 600000,
contextEnricher: contextEnricher,
};

Expand Down
23 changes: 19 additions & 4 deletions libs/providers/flagd/src/lib/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,20 @@ interface FlagdConfig extends Config {
contextEnricher: (syncContext: EvaluationContext | null) => EvaluationContext;
}

export type FlagdProviderOptions = Partial<FlagdConfig>;
export interface FlagdGrpcConfig extends Config {
/**
* The deadline for streaming connections.
*
* @default 600000
*/
streamDeadlineMs: number;
}

export type FlagdProviderOptions = Partial<FlagdConfig & FlagdGrpcConfig>;

const DEFAULT_CONFIG: Omit<FlagdConfig, 'port' | 'resolverType'> = {
const DEFAULT_CONFIG: Omit<FlagdConfig & FlagdGrpcConfig, 'port' | 'resolverType'> = {
deadlineMs: 500,
streamDeadlineMs: 600000,
host: 'localhost',
tls: false,
selector: '',
Expand All @@ -125,6 +135,7 @@ enum ENV_VAR {
FLAGD_PORT = 'FLAGD_PORT',
FLAGD_SYNC_PORT = 'FLAGD_SYNC_PORT',
FLAGD_DEADLINE_MS = 'FLAGD_DEADLINE_MS',
FLAGD_STREAM_DEADLINE_MS = 'FLAGD_STREAM_DEADLINE_MS',
FLAGD_TLS = 'FLAGD_TLS',
FLAGD_SOCKET_PATH = 'FLAGD_SOCKET_PATH',
FLAGD_SERVER_CERT_PATH = 'FLAGD_SERVER_CERT_PATH',
Expand All @@ -144,7 +155,7 @@ function checkEnvVarResolverType() {
);
}

const getEnvVarConfig = (): Partial<Config> => {
const getEnvVarConfig = (): Partial<Config & FlagdGrpcConfig> => {
let provider = undefined;
if (
process.env[ENV_VAR.FLAGD_RESOLVER] &&
Expand All @@ -167,6 +178,9 @@ const getEnvVarConfig = (): Partial<Config> => {
...(Number(process.env[ENV_VAR.FLAGD_DEADLINE_MS]) && {
deadlineMs: Number(process.env[ENV_VAR.FLAGD_DEADLINE_MS]),
}),
...(Number(process.env[ENV_VAR.FLAGD_STREAM_DEADLINE_MS]) && {
streamDeadlineMs: Number(process.env[ENV_VAR.FLAGD_STREAM_DEADLINE_MS]),
}),
...(process.env[ENV_VAR.FLAGD_TLS] && {
tls: process.env[ENV_VAR.FLAGD_TLS]?.toLowerCase() === 'true',
}),
Expand Down Expand Up @@ -197,7 +211,7 @@ const getEnvVarConfig = (): Partial<Config> => {
};
};

export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig {
export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig & FlagdGrpcConfig {
const envVarConfig = getEnvVarConfig();
const defaultConfig =
options.resolverType == 'in-process' || envVarConfig.resolverType == 'in-process'
Expand All @@ -207,5 +221,6 @@ export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig {
...defaultConfig,
...envVarConfig,
...options,
streamDeadlineMs: options.streamDeadlineMs ?? envVarConfig.streamDeadlineMs ?? DEFAULT_CONFIG.streamDeadlineMs,
};
}
25 changes: 17 additions & 8 deletions libs/providers/flagd/src/lib/flagd-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ describe(FlagdProvider.name, () => {
new FlagdProvider(
undefined,
undefined,
new GRPCService({ deadlineMs: 100, host: '', port: 123, tls: false }, basicServiceClientMock),
new GRPCService(
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false },
basicServiceClientMock,
),
),
);
client = OpenFeature.getClient('basic test');
Expand Down Expand Up @@ -302,7 +305,7 @@ describe(FlagdProvider.name, () => {
undefined,
undefined,
new GRPCService(
{ deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' },
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' },
streamingServiceClientMock,
),
)
Expand Down Expand Up @@ -330,7 +333,7 @@ describe(FlagdProvider.name, () => {
undefined,
undefined,
new GRPCService(
{ deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' },
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' },
streamingServiceClientMock,
),
),
Expand Down Expand Up @@ -382,7 +385,7 @@ describe(FlagdProvider.name, () => {
undefined,
undefined,
new GRPCService(
{ deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' },
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' },
streamingServiceClientMock,
),
),
Expand Down Expand Up @@ -417,7 +420,7 @@ describe(FlagdProvider.name, () => {
undefined,
undefined,
new GRPCService(
{ deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' },
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' },
streamingServiceClientMock,
),
),
Expand Down Expand Up @@ -509,7 +512,7 @@ describe(FlagdProvider.name, () => {
undefined,
undefined,
new GRPCService(
{ deadlineMs: 100, host: '', port: 123, tls: false, cache: 'lru' },
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false, cache: 'lru' },
streamingServiceClientMock,
),
);
Expand Down Expand Up @@ -589,7 +592,10 @@ describe(FlagdProvider.name, () => {
new FlagdProvider(
undefined,
undefined,
new GRPCService({ deadlineMs: 100, host: '', port: 123, tls: false }, errServiceClientMock),
new GRPCService(
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false },
errServiceClientMock,
),
),
);
client = OpenFeature.getClient('errors test');
Expand Down Expand Up @@ -676,7 +682,10 @@ describe(FlagdProvider.name, () => {
new FlagdProvider(
undefined,
undefined,
new GRPCService({ deadlineMs: 100, host: '', port: 123, tls: false }, errServiceClientMock),
new GRPCService(
{ deadlineMs: 100, streamDeadlineMs: 600000, host: '', port: 123, tls: false },
errServiceClientMock,
),
),
);
});
Expand Down
8 changes: 5 additions & 3 deletions libs/providers/flagd/src/lib/service/grpc/grpc-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type {
ResolveStringResponse,
} from '../../../proto/ts/flagd/evaluation/v1/evaluation';
import { ServiceClient } from '../../../proto/ts/flagd/evaluation/v1/evaluation';
import type { Config } from '../../configuration';
import type { FlagdGrpcConfig } from '../../configuration';
import { DEFAULT_MAX_CACHE_SIZE, EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from '../../constants';
import { FlagdProvider } from '../../flagd-provider';
import type { Service } from '../service';
Expand Down Expand Up @@ -69,14 +69,15 @@ export class GRPCService implements Service {
private _cacheEnabled = false;
private _eventStream: ClientReadableStream<EventStreamResponse> | undefined = undefined;
private _deadline: number;
private _streamDeadline: number;

private get _cacheActive() {
// the cache is "active" (able to be used) if the config enabled it, AND the gRPC stream is live
return this._cacheEnabled && this._client.getChannel().getConnectivityState(false) === ConnectivityState.READY;
}

constructor(
config: Config,
config: FlagdGrpcConfig,
client?: ServiceClient,
private logger?: Logger,
) {
Expand All @@ -94,6 +95,7 @@ export class GRPCService implements Service {
? client
: new ServiceClient(socketPath ? `unix://${socketPath}` : `${host}:${port}`, channelCredentials, clientOptions);
this._deadline = config.deadlineMs;
this._streamDeadline = config.streamDeadlineMs;

if (config.cache === 'lru') {
this._cacheEnabled = true;
Expand Down Expand Up @@ -211,7 +213,7 @@ export class GRPCService implements Service {
disconnectCallback: (message: string) => void,
) {
const channel = this._client.getChannel();
channel.watchConnectivityState(channel.getConnectivityState(true), Infinity, () => {
channel.watchConnectivityState(channel.getConnectivityState(true), this._streamDeadline, () => {
this.listen(reconnectCallback, changedCallback, disconnectCallback);
});
}
Expand Down