diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index fbd96378..3460b269 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -44,6 +44,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { let channel: any Channel @usableFromInline let channelHandler: ValkeyChannelHandler + @usableFromInline let configuration: ValkeyConnectionConfiguration let isClosed: Atomic @@ -174,7 +175,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { defer { span?.end() } span?.updateAttributes { attributes in - self.applyCommonAttributes(to: &attributes, commandName: Command.name) + self.applyCommonAttributes(to: &attributes) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Command.name } #endif @@ -227,7 +229,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, any Error>) { - self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands))]) + self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands).string)]) + + #if DistributedTracingSupport + let span = self.tracer?.startSpan("Pipeline", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + let commands = Self.concatenateCommandNames(repeat each commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commands.string + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif // this currently allocates a promise for every command. We could collapse this down to one promise var promises: [EventLoopPromise] = [] @@ -263,7 +279,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public func transaction( _ commands: repeat each Command ) async throws -> sending (repeat Result<(each Command).Response, Error>) { - self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands))]) + self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands).string)]) + + #if DistributedTracingSupport + let span = self.tracer?.startSpan("MULTI", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + let commands = Self.concatenateCommandNames(repeat each commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commands.string + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif + // Construct encoded commands and promise array var encoder = ValkeyCommandEncoder() var promises: [EventLoopPromise] = [] @@ -276,36 +307,46 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { EXEC().encode(into: &encoder) promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) - return try await _execute( - buffer: encoder.buffer, - promises: promises, - valkeyPromises: promises.map { .nio($0) } - ) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), any Error> in - let responses: EXEC.Response - do { - let execFutureResult = promises.last!.futureResult - responses = try await execFutureResult.get().decode(as: EXEC.Response.self) - } catch let error as ValkeyClientError where error.errorCode == .commandError { - // we received an error while running the EXEC command. Extract queuing - // results and throw error - var results: [Result] = .init() - results.reserveCapacity(promises.count - 2) - for promise in promises[1..<(promises.count - 1)] { - results.append(await promise.futureResult._result()) + do { + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), any Error> in + let responses: EXEC.Response + do { + let execFutureResult = promises.last!.futureResult + responses = try await execFutureResult.get().decode(as: EXEC.Response.self) + } catch let error as ValkeyClientError where error.errorCode == .commandError { + // we received an error while running the EXEC command. Extract queuing + // results and throw error + var results: [Result] = .init() + results.reserveCapacity(promises.count - 2) + for promise in promises[1..<(promises.count - 1)] { + results.append(await promise.futureResult._result()) + } + return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error)) + } catch { + return .failure(error) } - return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error)) - } catch { - return .failure(error) - } - // If EXEC returned nil then transaction was aborted because a - // WATCHed variable changed - guard let responses else { - return .failure(ValkeyTransactionError.transactionAborted) + // If EXEC returned nil then transaction was aborted because a + // WATCHed variable changed + guard let responses else { + return .failure(ValkeyTransactionError.transactionAborted) + } + // We convert all the RESP errors in the response array from EXEC to Result.failure + // and attempt to convert the remaining to their respective Response types + return .success(responses.decodeExecResults()) + }.get() + } catch { + #if DistributedTracingSupport + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) } - // We convert all the RESP errors in the response array from EXEC to Result.failure - // and attempt to convert the remaining to their respective Response types - return .success(responses.decodeExecResults()) - }.get() + #endif + throw error + } } /// Pipeline a series of commands to Valkey connection @@ -320,11 +361,24 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { /// - Parameter commands: Collection of ValkeyCommands /// - Returns: Array holding the RESPToken responses of all the commands @inlinable - public func execute( - _ commands: some Collection - ) async -> [Result] { + public func execute( + _ commands: Commands + ) async -> [Result] where Commands.Element == any ValkeyCommand { self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(commands))]) + #if DistributedTracingSupport + let span = self.tracer?.startSpan("Pipeline", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Self.concatenateCommandNames(commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif + // this currently allocates a promise for every command. We could collapse this down to one promise var promises: [EventLoopPromise] = [] promises.reserveCapacity(commands.count) @@ -366,10 +420,24 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { /// - Returns: Array holding the RESPToken responses of all the commands /// - Throws: ValkeyTransactionError when EXEC aborts @inlinable - public func transaction( - _ commands: some Collection - ) async throws -> [Result] { + public func transaction( + _ commands: Commands + ) async throws -> [Result] where Commands.Element == any ValkeyCommand { self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(commands))]) + + #if DistributedTracingSupport + let span = self.tracer?.startSpan("MULTI", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Self.concatenateCommandNames(commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif + // Construct encoded commands and promise array var encoder = ValkeyCommandEncoder() var promises: [EventLoopPromise] = [] @@ -382,12 +450,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { EXEC().encode(into: &encoder) promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) - return try await _execute( - buffer: encoder.buffer, - promises: promises, - valkeyPromises: promises.map { .nio($0) }, - processResults: self._processTransactionPromises - ).get() + do { + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) }, + processResults: self._processTransactionPromises + ).get() + } catch { + #if DistributedTracingSupport + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) + } + #endif + throw error + } } /// Pipeline a series of commands to Valkey connection and precede each command with an ASKING @@ -571,8 +649,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { return commonAttributes } @usableFromInline - func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { - attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName + func applyCommonAttributes(to attributes: inout SpanAttributes) { attributes.merge(self.commonSpanAttributes) } #endif @@ -587,35 +664,60 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } /// Concatenate names from parameter pack of commands together - @usableFromInline + @inlinable static func concatenateCommandNames( _ commands: repeat each Command - ) -> String { - var string: String = "" + ) -> (string: String, count: Int) { + // get length of string so we only do one allocation + var stringLength = 0 var count = 0 + for command in repeat each commands { + if count == 16 { + stringLength += 3 // length of ellipsis + break + } + stringLength += Swift.type(of: command).name.count + 1 + count += 1 + } + var string: String = "" + string.reserveCapacity(stringLength - 1) + + count = 0 for command in repeat each commands { if count == 0 { string += "\(Swift.type(of: command).name)" } else if count == 16 { string += "..." - break - } else { + } else if count < 16 { string += ",\(Swift.type(of: command).name)" } count += 1 } - return string + return (string, count) } /// Concatenate names from collection of command together - @usableFromInline + @inlinable static func concatenateCommandNames( _ commands: Commands ) -> String where Commands.Element == any ValkeyCommand { + // get length of string so we only do one allocation + var stringLength = 0 + var count = 0 + for command in commands { + if count == 16 { + stringLength += 3 // length of ellipsis + break + } + stringLength += Swift.type(of: command).name.count + 1 + count += 1 + } var string: String = "" + string.reserveCapacity(stringLength - 1) + guard let firstCommand = commands.first else { return "" } string = "\(Swift.type(of: firstCommand).name)" - var count = 1 + count = 1 for command in commands.dropFirst() { if count == 16 { string += "..." diff --git a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift index 1e09968c..df953ec7 100644 --- a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift +++ b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift @@ -179,6 +179,7 @@ public struct ValkeyTracingConfiguration: Sendable { /// Attribute names used in spans created by Valkey. public struct AttributeNames: Sendable { public var databaseOperationName: String = "db.operation.name" + public var databaseOperationBatchSize: String = "db.operation.batch.size" public var databaseSystemName: String = "db.system.name" public var networkPeerAddress: String = "network.peer.address" public var networkPeerPort: String = "network.peer.port" diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index b1c004eb..4796bd2a 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -735,7 +735,7 @@ struct ConnectionTests { #expect(span.status?.code == .error) } - @Test(.disabled("Pipeline support not implemented yet")) + @Test @available(valkeySwift 1.0, *) func testPipelinedSameCommandsSpan() async throws { let tracer = InMemoryTracer() @@ -762,13 +762,13 @@ struct ConnectionTests { #expect(tracer.finishedSpans.count == 1) let span = try #require(tracer.finishedSpans.first) - #expect(span.operationName == "MULTI") + #expect(span.operationName == "Pipeline") #expect(span.kind == .client) #expect(span.errors.isEmpty) #expect( span.attributes == [ "db.system.name": "valkey", - "db.operation.name": "MULTI SET", + "db.operation.name": "SET,SET", "db.operation.batch.size": 2, "server.address": "127.0.0.1", "network.peer.address": "127.0.0.1", @@ -778,7 +778,7 @@ struct ConnectionTests { #expect(span.status == nil) } - @Test(.disabled("Pipeline support not implemented yet")) + @Test @available(valkeySwift 1.0, *) func testPipelinedDifferentCommandsSpan() async throws { let tracer = InMemoryTracer() @@ -803,6 +803,148 @@ struct ConnectionTests { #expect(try await results.1.get().map { String($0) } == "bar") + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) + #expect(span.operationName == "Pipeline") + #expect(span.kind == .client) + #expect(span.errors.isEmpty) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "SET,GET", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.status == nil) + } + + @Test + @available(valkeySwift 1.0, *) + func testPipelinedManyCommands() async throws { + let tracer = InMemoryTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + let commandCount = 24 + let commands: [any ValkeyCommand] = .init(repeating: GET("foo"), count: 24) + async let results = connection.execute(commands) + + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + for _ in 0..