Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public struct ConnectionAndMetadata<Connection: PooledConnection>: Sendable {
public protocol PooledConnection: AnyObject, Sendable {
/// The connections identifier type.
associatedtype ID: Hashable & Sendable
associatedtype ConnectionError: Error = any Error

/// The connections identifier. The identifier is passed to
/// the connection factory method and must stay attached to
Expand All @@ -37,7 +38,7 @@ public protocol PooledConnection: AnyObject, Sendable {
/// }
/// }
/// ```
func onClose(_ closure: @escaping @Sendable ((any Error)?) -> ())
func onClose(_ closure: @escaping @Sendable ((ConnectionError)?) -> ())

/// Close the running connection. Once the close has completed
/// closures that were registered in `onClose` must be
Expand Down Expand Up @@ -78,6 +79,8 @@ public protocol ConnectionRequestProtocol: Sendable {
/// The leased connection type
associatedtype Connection: PooledConnection

associatedtype ConnectionFactoryError: Error = any Error

/// A connection lease request ID. This ID must be generated
/// by users of the `ConnectionPool` outside the
/// `ConnectionPool`. It is not generated inside the pool like
Expand All @@ -88,7 +91,7 @@ public protocol ConnectionRequestProtocol: Sendable {

/// A function that is called with a connection or a
/// `PoolError`.
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>)
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError<ConnectionFactoryError>>)
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
Expand Down Expand Up @@ -139,24 +142,28 @@ public final class ConnectionPool<
Connection: PooledConnection,
ConnectionID: Hashable & Sendable,
ConnectionIDGenerator: ConnectionIDGeneratorProtocol,
ConnectionFactoryError: Error,
Request: ConnectionRequestProtocol,
RequestID: Hashable & Sendable,
KeepAliveBehavior: ConnectionKeepAliveBehavior,
ObservabilityDelegate: ConnectionPoolObservabilityDelegate,
Clock: _Concurrency.Clock
Clock: _Concurrency.Clock,
Instant: InstantProtocol
>: Sendable where
Connection.ID == ConnectionID,
ConnectionIDGenerator.ID == ConnectionID,
Request.Connection == Connection,
Request.ID == RequestID,
Request.ConnectionFactoryError == ConnectionFactoryError,
KeepAliveBehavior.Connection == Connection,
ObservabilityDelegate.ConnectionID == ConnectionID,
Clock.Duration == Duration
Clock.Duration == Duration,
Instant == Clock.Instant
{
public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool<Connection, ConnectionID, ConnectionIDGenerator, Request, RequestID, KeepAliveBehavior, ObservabilityDelegate, Clock>) async throws -> ConnectionAndMetadata<Connection>
public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool<Connection, ConnectionID, ConnectionIDGenerator, ConnectionFactoryError, Request, RequestID, KeepAliveBehavior, ObservabilityDelegate, Clock, Instant>) async throws(ConnectionFactoryError) -> ConnectionAndMetadata<Connection>

@usableFromInline
typealias StateMachine = PoolStateMachine<Connection, ConnectionIDGenerator, ConnectionID, Request, Request.ID, CheckedContinuation<Void, Never>, Clock, Clock.Instant>
typealias StateMachine = PoolStateMachine<Connection, ConnectionIDGenerator, ConnectionID, ConnectionFactoryError, Request, Request.ID, CheckedContinuation<Void, Never>, Clock, Clock.Instant>

@usableFromInline
let factory: ConnectionFactory
Expand Down Expand Up @@ -438,7 +445,7 @@ public final class ConnectionPool<
taskGroup.addTask_ {
self.observabilityDelegate.startedConnecting(id: request.connectionID)

do {
do throws(ConnectionFactoryError) {
let bundle = try await self.factory(request.connectionID, self)
self.connectionEstablished(bundle)

Expand Down Expand Up @@ -471,7 +478,7 @@ public final class ConnectionPool<
}

@inlinable
/*private*/ func connectionEstablishFailed(_ error: Error, for request: StateMachine.ConnectionRequest) {
/*private*/ func connectionEstablishFailed(_ error: ConnectionFactoryError, for request: StateMachine.ConnectionRequest) {
self.observabilityDelegate.connectFailed(id: request.connectionID, error: error)

self.modifyStateAndRunActions { state in
Expand Down
65 changes: 42 additions & 23 deletions Sources/ConnectionPoolModule/ConnectionPoolError.swift
Original file line number Diff line number Diff line change
@@ -1,31 +1,50 @@

public struct ConnectionPoolError: Error, Hashable {
@usableFromInline
enum Base: Error, Hashable, Sendable {
case requestCancelled
case poolShutdown
case connectionCreationCircuitBreakerTripped
}
public struct ConnectionPoolError<Underlying: Error>: Error {

@usableFromInline
let base: Base
public struct Code: Sendable, Hashable {
@usableFromInline
enum Base: Error, Hashable, Sendable {
case requestCancelled
case poolShutdown
case connectionCreationCircuitBreakerTripped
}

@inlinable
init(_ base: Base) { self.base = base }
@usableFromInline
let base: Base

/// The connection requests got cancelled
@inlinable
public static var requestCancelled: Self {
ConnectionPoolError(.requestCancelled)
}
/// The connection requests can't be fulfilled as the pool has already been shutdown
@inlinable
public static var poolShutdown: Self {
ConnectionPoolError(.poolShutdown)
@inlinable
init(_ base: Base) {
self.base = base
}

/// The connection requests got cancelled
@inlinable
public static var requestCancelled: Self {
Code(.requestCancelled)
}
/// The connection requests can't be fulfilled as the pool has already been shutdown
@inlinable
public static var poolShutdown: Self {
Code(.poolShutdown)
}
/// The connection pool has failed to make a connection after a defined time
@inlinable
public static var connectionCreationCircuitBreakerTripped: Self {
Code(.connectionCreationCircuitBreakerTripped)
}
}
/// The connection pool has failed to make a connection after a defined time

public let code: Code

public let underlying: Underlying?

@inlinable
public static var connectionCreationCircuitBreakerTripped: Self {
ConnectionPoolError(.connectionCreationCircuitBreakerTripped)
init(_ code: Code, underlying: Underlying? = nil) {
self.code = code
self.underlying = underlying
}
}

extension ConnectionPoolError: Equatable where Underlying: Equatable {}
extension ConnectionPoolError: Hashable where Underlying: Hashable {}
extension ConnectionPoolError: Sendable where Underlying: Sendable {}
5 changes: 3 additions & 2 deletions Sources/ConnectionPoolModule/ConnectionRequest.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequestProtocol {
public typealias ConnectionError = any Error
public typealias ID = Int

public var id: ID
Expand All @@ -16,7 +17,7 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
self.continuation = continuation
}

public func complete(with result: Result<ConnectionLease<Connection>, ConnectionPoolError>) {
public func complete(with result: Result<ConnectionLease<Connection>, ConnectionPoolError<any Error>>) {
self.continuation.resume(with: result)
}
}
Expand All @@ -25,7 +26,7 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
let requestIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator()

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension ConnectionPool where Request == ConnectionRequest<Connection> {
extension ConnectionPool where Request == ConnectionRequest<Connection>, Clock == ContinuousClock, ConnectionIDGenerator == _ConnectionPoolModule.ConnectionIDGenerator {
public convenience init(
configuration: ConnectionPoolConfiguration,
idGenerator: ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator(),
Expand Down
55 changes: 34 additions & 21 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,20 @@ struct PoolStateMachine<
Connection: PooledConnection,
ConnectionIDGenerator: ConnectionIDGeneratorProtocol,
ConnectionID: Hashable & Sendable,
ConnectionFactoryError: Error,
Request: ConnectionRequestProtocol,
RequestID,
TimerCancellationToken: Sendable,
Clock: _Concurrency.Clock,
Instant: InstantProtocol
>: Sendable where Connection.ID == ConnectionID, ConnectionIDGenerator.ID == ConnectionID, RequestID == Request.ID, Clock.Duration == Duration, Clock.Instant == Instant {
>: Sendable where
Connection.ID == ConnectionID,
ConnectionIDGenerator.ID == ConnectionID,
RequestID == Request.ID,
Clock.Duration == Duration,
Clock.Instant == Instant,
ConnectionFactoryError == Request.ConnectionFactoryError
{

@usableFromInline
struct ConnectionRequest: Hashable, Sendable {
Expand Down Expand Up @@ -104,8 +112,8 @@ struct PoolStateMachine<
enum RequestAction {
case leaseConnection(TinyFastSequence<Request>, Connection)

case failRequest(Request, ConnectionPoolError)
case failRequests(TinyFastSequence<Request>, ConnectionPoolError)
case failRequest(Request, ConnectionPoolError<ConnectionFactoryError>)
case failRequests(TinyFastSequence<Request>, ConnectionPoolError<ConnectionFactoryError>)

case none
}
Expand All @@ -116,8 +124,8 @@ struct PoolStateMachine<
struct ConnectionCreationFailingContext: Sendable {
@usableFromInline
init(
timeOfFirstFailedAttempt: Clock.Instant,
error: any Error,
timeOfFirstFailedAttempt: Instant,
error: ConnectionFactoryError,
connectionIDToRetry: ConnectionID
) {
self.timeOfFirstFailedAttempt = timeOfFirstFailedAttempt
Expand All @@ -128,11 +136,11 @@ struct PoolStateMachine<
}

@usableFromInline
var timeOfFirstFailedAttempt: Clock.Instant
var timeOfFirstFailedAttempt: Instant
@usableFromInline
var firstError: any Error
var firstError: ConnectionFactoryError
@usableFromInline
var lastError: any Error
var lastError: ConnectionFactoryError
@usableFromInline
var numberOfFailedAttempts: Int
@usableFromInline
Expand All @@ -143,17 +151,16 @@ struct PoolStateMachine<
struct CircuitBreakerOpenContext: Sendable {
@usableFromInline
init(_ creationFailingContext: ConnectionCreationFailingContext) {

self.firstError = creationFailingContext.firstError
self.lastError = creationFailingContext.lastError
self.numberOfFailedAttempts = creationFailingContext.numberOfFailedAttempts
self.connectionIDToRetry = creationFailingContext.connectionIDToRetry
}

@usableFromInline
var firstError: any Error
var firstError: ConnectionFactoryError
@usableFromInline
var lastError: any Error
var lastError: ConnectionFactoryError
@usableFromInline
var numberOfFailedAttempts: Int
@usableFromInline
Expand Down Expand Up @@ -273,13 +280,13 @@ struct PoolStateMachine<

case .circuitBreakOpen:
return .init(
request: .failRequest(request, ConnectionPoolError.connectionCreationCircuitBreakerTripped),
request: .failRequest(request, .init(.connectionCreationCircuitBreakerTripped)),
connection: .none
)

case .shuttingDown, .shutDown:
return .init(
request: .failRequest(request, ConnectionPoolError.poolShutdown),
request: .failRequest(request, .init(.poolShutdown)),
connection: .none
)
}
Expand Down Expand Up @@ -346,7 +353,7 @@ struct PoolStateMachine<
}

return .init(
request: .failRequest(request, ConnectionPoolError.requestCancelled),
request: .failRequest(request, .init(.requestCancelled)),
connection: .none
)
}
Expand Down Expand Up @@ -422,7 +429,7 @@ struct PoolStateMachine<
}

@inlinable
mutating func connectionEstablishFailed(_ error: Error, for request: ConnectionRequest) -> Action {
mutating func connectionEstablishFailed(_ error: ConnectionFactoryError, for request: ConnectionRequest) -> Action {
switch self.poolState {
case .running:
self.poolState = .connectionCreationFailing(
Expand All @@ -447,7 +454,7 @@ struct PoolStateMachine<
if creationFailingContext.timeOfFirstFailedAttempt.duration(to: clock.now) > self.configuration.circuitBreakerTripAfter,
self.connections.stats.idle + self.connections.stats.leased == 0 {
self.poolState = .circuitBreakOpen(.init(creationFailingContext))
requestAction = .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.connectionCreationCircuitBreakerTripped)
requestAction = .failRequests(self.requestQueue.removeAll(), .init(.connectionCreationCircuitBreakerTripped))
} else {
self.poolState = .connectionCreationFailing(creationFailingContext)
}
Expand Down Expand Up @@ -646,13 +653,13 @@ struct PoolStateMachine<
if self.connections.isEmpty, shutdown.connections.isEmpty {
self.poolState = .shutDown
return .init(
request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown),
request: .failRequests(self.requestQueue.removeAll(), .init(.poolShutdown)),
connection: .cancelEventStreamAndFinalCleanup(shutdown.timersToCancel)
)
}

return .init(
request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown),
request: .failRequests(self.requestQueue.removeAll(), .init(.poolShutdown)),
connection: .initiateShutdown(shutdown)
)

Expand Down Expand Up @@ -794,10 +801,16 @@ extension PoolStateMachine {
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {}
extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable, Request.ConnectionFactoryError: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.PoolState.CircuitBreakerOpenContext: Equatable where ConnectionFactoryError: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.PoolState.ConnectionCreationFailingContext: Equatable where ConnectionFactoryError: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
//extension PoolStateMachine.PoolState: Equatable {}
extension PoolStateMachine.PoolState: Equatable where ConnectionFactoryError: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable {
Expand Down Expand Up @@ -838,7 +851,7 @@ extension PoolStateMachine.ConnectionAction.Shutdown: Equatable where TimerCance


@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.RequestAction: Equatable where Request: Equatable {
extension PoolStateMachine.RequestAction: Equatable where Request: Equatable, ConnectionFactoryError: Equatable {

@usableFromInline
static func ==(lhs: Self, rhs: Self) -> Bool {
Expand Down
6 changes: 3 additions & 3 deletions Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import NIOConcurrencyHelpers
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
public final class MockConnectionFactory<Clock: _Concurrency.Clock>: Sendable where Clock.Duration == Duration {
public typealias ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator
public typealias Request = ConnectionRequest<MockConnection>
public typealias Request = MockRequest<MockConnection>
public typealias KeepAliveBehavior = MockPingPongBehavior
public typealias MetricsDelegate = NoOpConnectionPoolMetrics<Int>
public typealias ConnectionID = Int
Expand Down Expand Up @@ -37,7 +37,7 @@ public final class MockConnectionFactory<Clock: _Concurrency.Clock>: Sendable wh

public func makeConnection(
id: Int,
for pool: ConnectionPool<MockConnection, Int, ConnectionIDGenerator, some ConnectionRequestProtocol, Int, MockPingPongBehavior<MockConnection>, NoOpConnectionPoolMetrics<Int>, Clock>
for pool: ConnectionPool<MockConnection, Int, ConnectionIDGenerator, MockError, Request, MockRequest<MockConnection>.ID, MockPingPongBehavior<MockConnection>, NoOpConnectionPoolMetrics<Int>, Clock, Clock.Instant>
) async throws -> ConnectionAndMetadata<MockConnection> {
if let autoMaxStreams = self.autoMaxStreams {
let connection = MockConnection(id: id)
Expand Down Expand Up @@ -68,7 +68,7 @@ public final class MockConnectionFactory<Clock: _Concurrency.Clock>: Sendable wh
}

@discardableResult
public func nextConnectAttempt(_ closure: (ConnectionID) async throws -> UInt16) async rethrows -> Connection {
public func nextConnectAttempt(_ closure: (ConnectionID) async throws(MockError) -> UInt16) async rethrows -> Connection {
let (connectionID, continuation) = await withCheckedContinuation { (continuation: CheckedContinuation<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>) in
let attempt = self.stateBox.withLockedValue { state -> (ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>)? in
if let attempt = state.attempts.popFirst() {
Expand Down
8 changes: 8 additions & 0 deletions Sources/ConnectionPoolTestUtils/MockError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
public struct MockError: Error, Hashable, Sendable {

public var id: Int

public init(id: Int = 0) {
self.id = id
}
}
Loading
Loading