diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index b34c1435..ed456ada 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -16,6 +16,7 @@ public struct ConnectionAndMetadata: Sendable { public protocol PooledConnection: AnyObject, Sendable { /// The connections identifier type. associatedtype ID: Hashable & Sendable + associatedtype ConnectionError: Error = any Error /// The connections identifier. The identifier is passed to /// the connection factory method and must stay attached to @@ -37,7 +38,7 @@ public protocol PooledConnection: AnyObject, Sendable { /// } /// } /// ``` - func onClose(_ closure: @escaping @Sendable ((any Error)?) -> ()) + func onClose(_ closure: @escaping @Sendable ((ConnectionError)?) -> ()) /// Close the running connection. Once the close has completed /// closures that were registered in `onClose` must be @@ -78,6 +79,8 @@ public protocol ConnectionRequestProtocol: Sendable { /// The leased connection type associatedtype Connection: PooledConnection + associatedtype ConnectionFactoryError: Error = any Error + /// A connection lease request ID. This ID must be generated /// by users of the `ConnectionPool` outside the /// `ConnectionPool`. It is not generated inside the pool like @@ -88,7 +91,7 @@ public protocol ConnectionRequestProtocol: Sendable { /// A function that is called with a connection or a /// `PoolError`. - func complete(with: Result, ConnectionPoolError>) + func complete(with: Result, ConnectionPoolError>) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -139,24 +142,28 @@ public final class ConnectionPool< Connection: PooledConnection, ConnectionID: Hashable & Sendable, ConnectionIDGenerator: ConnectionIDGeneratorProtocol, + ConnectionFactoryError: Error, Request: ConnectionRequestProtocol, RequestID: Hashable & Sendable, KeepAliveBehavior: ConnectionKeepAliveBehavior, ObservabilityDelegate: ConnectionPoolObservabilityDelegate, - Clock: _Concurrency.Clock + Clock: _Concurrency.Clock, + Instant: InstantProtocol >: Sendable where Connection.ID == ConnectionID, ConnectionIDGenerator.ID == ConnectionID, Request.Connection == Connection, Request.ID == RequestID, + Request.ConnectionFactoryError == ConnectionFactoryError, KeepAliveBehavior.Connection == Connection, ObservabilityDelegate.ConnectionID == ConnectionID, - Clock.Duration == Duration + Clock.Duration == Duration, + Instant == Clock.Instant { - public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool) async throws -> ConnectionAndMetadata + public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool) async throws(ConnectionFactoryError) -> ConnectionAndMetadata @usableFromInline - typealias StateMachine = PoolStateMachine, Clock, Clock.Instant> + typealias StateMachine = PoolStateMachine, Clock, Clock.Instant> @usableFromInline let factory: ConnectionFactory @@ -438,7 +445,7 @@ public final class ConnectionPool< taskGroup.addTask_ { self.observabilityDelegate.startedConnecting(id: request.connectionID) - do { + do throws(ConnectionFactoryError) { let bundle = try await self.factory(request.connectionID, self) self.connectionEstablished(bundle) @@ -471,7 +478,7 @@ public final class ConnectionPool< } @inlinable - /*private*/ func connectionEstablishFailed(_ error: Error, for request: StateMachine.ConnectionRequest) { + /*private*/ func connectionEstablishFailed(_ error: ConnectionFactoryError, for request: StateMachine.ConnectionRequest) { self.observabilityDelegate.connectFailed(id: request.connectionID, error: error) self.modifyStateAndRunActions { state in diff --git a/Sources/ConnectionPoolModule/ConnectionPoolError.swift b/Sources/ConnectionPoolModule/ConnectionPoolError.swift index 5fd867f6..6f91c3ab 100644 --- a/Sources/ConnectionPoolModule/ConnectionPoolError.swift +++ b/Sources/ConnectionPoolModule/ConnectionPoolError.swift @@ -1,31 +1,50 @@ -public struct ConnectionPoolError: Error, Hashable { - @usableFromInline - enum Base: Error, Hashable, Sendable { - case requestCancelled - case poolShutdown - case connectionCreationCircuitBreakerTripped - } +public struct ConnectionPoolError: Error { - @usableFromInline - let base: Base + public struct Code: Sendable, Hashable { + @usableFromInline + enum Base: Error, Hashable, Sendable { + case requestCancelled + case poolShutdown + case connectionCreationCircuitBreakerTripped + } - @inlinable - init(_ base: Base) { self.base = base } + @usableFromInline + let base: Base - /// The connection requests got cancelled - @inlinable - public static var requestCancelled: Self { - ConnectionPoolError(.requestCancelled) - } - /// The connection requests can't be fulfilled as the pool has already been shutdown - @inlinable - public static var poolShutdown: Self { - ConnectionPoolError(.poolShutdown) + @inlinable + init(_ base: Base) { + self.base = base + } + + /// The connection requests got cancelled + @inlinable + public static var requestCancelled: Self { + Code(.requestCancelled) + } + /// The connection requests can't be fulfilled as the pool has already been shutdown + @inlinable + public static var poolShutdown: Self { + Code(.poolShutdown) + } + /// The connection pool has failed to make a connection after a defined time + @inlinable + public static var connectionCreationCircuitBreakerTripped: Self { + Code(.connectionCreationCircuitBreakerTripped) + } } - /// The connection pool has failed to make a connection after a defined time + + public let code: Code + + public let underlying: Underlying? + @inlinable - public static var connectionCreationCircuitBreakerTripped: Self { - ConnectionPoolError(.connectionCreationCircuitBreakerTripped) + init(_ code: Code, underlying: Underlying? = nil) { + self.code = code + self.underlying = underlying } } + +extension ConnectionPoolError: Equatable where Underlying: Equatable {} +extension ConnectionPoolError: Hashable where Underlying: Hashable {} +extension ConnectionPoolError: Sendable where Underlying: Sendable {} diff --git a/Sources/ConnectionPoolModule/ConnectionRequest.swift b/Sources/ConnectionPoolModule/ConnectionRequest.swift index d6654a27..88f97318 100644 --- a/Sources/ConnectionPoolModule/ConnectionRequest.swift +++ b/Sources/ConnectionPoolModule/ConnectionRequest.swift @@ -1,5 +1,6 @@ public struct ConnectionRequest: ConnectionRequestProtocol { + public typealias ConnectionError = any Error public typealias ID = Int public var id: ID @@ -16,7 +17,7 @@ public struct ConnectionRequest: ConnectionRequest self.continuation = continuation } - public func complete(with result: Result, ConnectionPoolError>) { + public func complete(with result: Result, ConnectionPoolError>) { self.continuation.resume(with: result) } } @@ -25,7 +26,7 @@ public struct ConnectionRequest: ConnectionRequest let requestIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator() @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -extension ConnectionPool where Request == ConnectionRequest { +extension ConnectionPool where Request == ConnectionRequest, Clock == ContinuousClock, ConnectionIDGenerator == _ConnectionPoolModule.ConnectionIDGenerator { public convenience init( configuration: ConnectionPoolConfiguration, idGenerator: ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator(), diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index fabb672c..89558a03 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -40,12 +40,20 @@ struct PoolStateMachine< Connection: PooledConnection, ConnectionIDGenerator: ConnectionIDGeneratorProtocol, ConnectionID: Hashable & Sendable, + ConnectionFactoryError: Error, Request: ConnectionRequestProtocol, RequestID, TimerCancellationToken: Sendable, Clock: _Concurrency.Clock, Instant: InstantProtocol ->: Sendable where Connection.ID == ConnectionID, ConnectionIDGenerator.ID == ConnectionID, RequestID == Request.ID, Clock.Duration == Duration, Clock.Instant == Instant { +>: Sendable where + Connection.ID == ConnectionID, + ConnectionIDGenerator.ID == ConnectionID, + RequestID == Request.ID, + Clock.Duration == Duration, + Clock.Instant == Instant, + ConnectionFactoryError == Request.ConnectionFactoryError +{ @usableFromInline struct ConnectionRequest: Hashable, Sendable { @@ -104,8 +112,8 @@ struct PoolStateMachine< enum RequestAction { case leaseConnection(TinyFastSequence, Connection) - case failRequest(Request, ConnectionPoolError) - case failRequests(TinyFastSequence, ConnectionPoolError) + case failRequest(Request, ConnectionPoolError) + case failRequests(TinyFastSequence, ConnectionPoolError) case none } @@ -116,8 +124,8 @@ struct PoolStateMachine< struct ConnectionCreationFailingContext: Sendable { @usableFromInline init( - timeOfFirstFailedAttempt: Clock.Instant, - error: any Error, + timeOfFirstFailedAttempt: Instant, + error: ConnectionFactoryError, connectionIDToRetry: ConnectionID ) { self.timeOfFirstFailedAttempt = timeOfFirstFailedAttempt @@ -128,11 +136,11 @@ struct PoolStateMachine< } @usableFromInline - var timeOfFirstFailedAttempt: Clock.Instant + var timeOfFirstFailedAttempt: Instant @usableFromInline - var firstError: any Error + var firstError: ConnectionFactoryError @usableFromInline - var lastError: any Error + var lastError: ConnectionFactoryError @usableFromInline var numberOfFailedAttempts: Int @usableFromInline @@ -143,7 +151,6 @@ struct PoolStateMachine< struct CircuitBreakerOpenContext: Sendable { @usableFromInline init(_ creationFailingContext: ConnectionCreationFailingContext) { - self.firstError = creationFailingContext.firstError self.lastError = creationFailingContext.lastError self.numberOfFailedAttempts = creationFailingContext.numberOfFailedAttempts @@ -151,9 +158,9 @@ struct PoolStateMachine< } @usableFromInline - var firstError: any Error + var firstError: ConnectionFactoryError @usableFromInline - var lastError: any Error + var lastError: ConnectionFactoryError @usableFromInline var numberOfFailedAttempts: Int @usableFromInline @@ -273,13 +280,13 @@ struct PoolStateMachine< case .circuitBreakOpen: return .init( - request: .failRequest(request, ConnectionPoolError.connectionCreationCircuitBreakerTripped), + request: .failRequest(request, .init(.connectionCreationCircuitBreakerTripped)), connection: .none ) case .shuttingDown, .shutDown: return .init( - request: .failRequest(request, ConnectionPoolError.poolShutdown), + request: .failRequest(request, .init(.poolShutdown)), connection: .none ) } @@ -346,7 +353,7 @@ struct PoolStateMachine< } return .init( - request: .failRequest(request, ConnectionPoolError.requestCancelled), + request: .failRequest(request, .init(.requestCancelled)), connection: .none ) } @@ -422,7 +429,7 @@ struct PoolStateMachine< } @inlinable - mutating func connectionEstablishFailed(_ error: Error, for request: ConnectionRequest) -> Action { + mutating func connectionEstablishFailed(_ error: ConnectionFactoryError, for request: ConnectionRequest) -> Action { switch self.poolState { case .running: self.poolState = .connectionCreationFailing( @@ -447,7 +454,7 @@ struct PoolStateMachine< if creationFailingContext.timeOfFirstFailedAttempt.duration(to: clock.now) > self.configuration.circuitBreakerTripAfter, self.connections.stats.idle + self.connections.stats.leased == 0 { self.poolState = .circuitBreakOpen(.init(creationFailingContext)) - requestAction = .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.connectionCreationCircuitBreakerTripped) + requestAction = .failRequests(self.requestQueue.removeAll(), .init(.connectionCreationCircuitBreakerTripped)) } else { self.poolState = .connectionCreationFailing(creationFailingContext) } @@ -646,13 +653,13 @@ struct PoolStateMachine< if self.connections.isEmpty, shutdown.connections.isEmpty { self.poolState = .shutDown return .init( - request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), + request: .failRequests(self.requestQueue.removeAll(), .init(.poolShutdown)), connection: .cancelEventStreamAndFinalCleanup(shutdown.timersToCancel) ) } return .init( - request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), + request: .failRequests(self.requestQueue.removeAll(), .init(.poolShutdown)), connection: .initiateShutdown(shutdown) ) @@ -794,10 +801,16 @@ extension PoolStateMachine { } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {} +extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable, Request.ConnectionFactoryError: Equatable {} + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension PoolStateMachine.PoolState.CircuitBreakerOpenContext: Equatable where ConnectionFactoryError: Equatable {} + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension PoolStateMachine.PoolState.ConnectionCreationFailingContext: Equatable where ConnectionFactoryError: Equatable {} @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -//extension PoolStateMachine.PoolState: Equatable {} +extension PoolStateMachine.PoolState: Equatable where ConnectionFactoryError: Equatable {} @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable { @@ -838,7 +851,7 @@ extension PoolStateMachine.ConnectionAction.Shutdown: Equatable where TimerCance @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -extension PoolStateMachine.RequestAction: Equatable where Request: Equatable { +extension PoolStateMachine.RequestAction: Equatable where Request: Equatable, ConnectionFactoryError: Equatable { @usableFromInline static func ==(lhs: Self, rhs: Self) -> Bool { diff --git a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift index 936b47cc..50c216e5 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift @@ -5,7 +5,7 @@ import NIOConcurrencyHelpers @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) public final class MockConnectionFactory: Sendable where Clock.Duration == Duration { public typealias ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator - public typealias Request = ConnectionRequest + public typealias Request = MockRequest public typealias KeepAliveBehavior = MockPingPongBehavior public typealias MetricsDelegate = NoOpConnectionPoolMetrics public typealias ConnectionID = Int @@ -37,7 +37,7 @@ public final class MockConnectionFactory: Sendable wh public func makeConnection( id: Int, - for pool: ConnectionPool, NoOpConnectionPoolMetrics, Clock> + for pool: ConnectionPool.ID, MockPingPongBehavior, NoOpConnectionPoolMetrics, Clock, Clock.Instant> ) async throws -> ConnectionAndMetadata { if let autoMaxStreams = self.autoMaxStreams { let connection = MockConnection(id: id) @@ -68,7 +68,7 @@ public final class MockConnectionFactory: Sendable wh } @discardableResult - public func nextConnectAttempt(_ closure: (ConnectionID) async throws -> UInt16) async rethrows -> Connection { + public func nextConnectAttempt(_ closure: (ConnectionID) async throws(MockError) -> UInt16) async rethrows -> Connection { let (connectionID, continuation) = await withCheckedContinuation { (continuation: CheckedContinuation<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>) in let attempt = self.stateBox.withLockedValue { state -> (ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>)? in if let attempt = state.attempts.popFirst() { diff --git a/Sources/ConnectionPoolTestUtils/MockError.swift b/Sources/ConnectionPoolTestUtils/MockError.swift new file mode 100644 index 00000000..f5b4d8f4 --- /dev/null +++ b/Sources/ConnectionPoolTestUtils/MockError.swift @@ -0,0 +1,8 @@ +public struct MockError: Error, Hashable, Sendable { + + public var id: Int + + public init(id: Int = 0) { + self.id = id + } +} diff --git a/Sources/ConnectionPoolTestUtils/MockRequest.swift b/Sources/ConnectionPoolTestUtils/MockRequest.swift index 3dd8b0fb..1c698eba 100644 --- a/Sources/ConnectionPoolTestUtils/MockRequest.swift +++ b/Sources/ConnectionPoolTestUtils/MockRequest.swift @@ -1,6 +1,8 @@ import _ConnectionPoolModule public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { + public typealias ConnectionFactoryError = MockError + public struct ID: Hashable, Sendable { var objectID: ObjectIdentifier @@ -21,7 +23,7 @@ public final class MockRequest: ConnectionRequestP hasher.combine(self.id) } - public func complete(with: Result, ConnectionPoolError>) { + public func complete(with: Result, ConnectionPoolError>) { } } diff --git a/Sources/PostgresNIO/Pool/PostgresClient.swift b/Sources/PostgresNIO/Pool/PostgresClient.swift index 581b5113..cf8389a0 100644 --- a/Sources/PostgresNIO/Pool/PostgresClient.swift +++ b/Sources/PostgresNIO/Pool/PostgresClient.swift @@ -237,11 +237,13 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { PostgresConnection, PostgresConnection.ID, ConnectionIDGenerator, + any Error, ConnectionRequest, ConnectionRequest.ID, PostgresKeepAliveBehavor, PostgresClientMetrics, - ContinuousClock + ContinuousClock, + ContinuousClock.Instant > let pool: Pool @@ -536,7 +538,7 @@ extension PostgresConnection: PooledConnection { extension ConnectionPoolError { func mapToPSQLError(lastConnectError: Error?) -> Error { var psqlError: PSQLError - switch self { + switch self.code { case .poolShutdown: psqlError = PSQLError.poolClosed psqlError.underlying = self