Skip to content

Commit a4211e7

Browse files
feat(NODE-7333): add support for deprioritized servers to all topologies (#4821)
1 parent c58ca1f commit a4211e7

File tree

63 files changed

+2524
-246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2524
-246
lines changed

src/change_stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { MongoClient } from './mongo_client';
1818
import { type InferIdType, TypedEventEmitter } from './mongo_types';
1919
import type { AggregateOptions } from './operations/aggregate';
2020
import type { OperationParent } from './operations/command';
21+
import { DeprioritizedServers } from './sdam/server_selection';
2122
import type { ServerSessionId } from './sessions';
2223
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
2324
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
@@ -1073,7 +1074,8 @@ export class ChangeStream<
10731074
try {
10741075
await topology.selectServer(this.cursor.readPreference, {
10751076
operationName: 'reconnect topology in change stream',
1076-
timeoutContext: this.timeoutContext
1077+
timeoutContext: this.timeoutContext,
1078+
deprioritizedServers: new DeprioritizedServers()
10771079
});
10781080
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
10791081
} catch {

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ export type {
588588
TagSet,
589589
TopologyVersion
590590
} from './sdam/server_description';
591-
export type { ServerSelector } from './sdam/server_selection';
591+
export type { DeprioritizedServers, ServerSelector } from './sdam/server_selection';
592592
export type { SrvPoller, SrvPollerEvents, SrvPollerOptions } from './sdam/srv_polling';
593593
export type {
594594
ConnectOptions,

src/mongo_client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_conc
4848
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
4949
import type { ServerMonitoringMode } from './sdam/monitor';
5050
import type { TagSet } from './sdam/server_description';
51-
import { readPreferenceServerSelector } from './sdam/server_selection';
51+
import { DeprioritizedServers, readPreferenceServerSelector } from './sdam/server_selection';
5252
import type { SrvPoller } from './sdam/srv_polling';
5353
import { Topology, type TopologyEvents } from './sdam/topology';
5454
import { ClientSession, type ClientSessionOptions, ServerSessionPool } from './sessions';
@@ -789,7 +789,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
789789
// to avoid the server selection timeout.
790790
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
791791
const serverDescriptions = Array.from(topologyDescription.servers.values());
792-
const servers = selector(topologyDescription, serverDescriptions);
792+
const servers = selector(topologyDescription, serverDescriptions, new DeprioritizedServers());
793793
if (servers.length !== 0) {
794794
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
795795
if (endSessions.length !== 0) {

src/operations/execute_operation.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import {
1717
} from '../error';
1818
import type { MongoClient } from '../mongo_client';
1919
import { ReadPreference } from '../read_preference';
20-
import type { ServerDescription } from '../sdam/server_description';
2120
import {
21+
DeprioritizedServers,
2222
sameServerSelector,
2323
secondaryWritableServerSelector,
2424
type ServerSelector
@@ -207,7 +207,8 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
207207
session,
208208
operationName: operation.commandName,
209209
timeoutContext,
210-
signal: operation.options.signal
210+
signal: operation.options.signal,
211+
deprioritizedServers: new DeprioritizedServers()
211212
});
212213

213214
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
@@ -234,7 +235,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
234235

235236
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
236237
let previousOperationError: MongoError | undefined;
237-
let previousServer: ServerDescription | undefined;
238+
const deprioritizedServers = new DeprioritizedServers();
238239

239240
for (let tries = 0; tries < maxTries; tries++) {
240241
if (previousOperationError) {
@@ -270,7 +271,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
270271
server = await topology.selectServer(selector, {
271272
session,
272273
operationName: operation.commandName,
273-
previousServer,
274+
deprioritizedServers,
274275
signal: operation.options.signal
275276
});
276277

@@ -303,7 +304,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
303304
) {
304305
throw previousOperationError;
305306
}
306-
previousServer = server.description;
307+
deprioritizedServers.add(server.description);
307308
previousOperationError = operationError;
308309

309310
// Reset timeouts

0 commit comments

Comments
 (0)