From f4558b8f126e363d39c1af91036fe6ce0c0d046c Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 18 Dec 2025 14:34:25 +0000 Subject: [PATCH 1/2] Add tracing spans for pipeline and transaction functions Signed-off-by: Adam Fowler --- .../Valkey/Connection/ValkeyConnection.swift | 199 +++++++++++++----- .../ValkeyConnectionConfiguration.swift | 1 + Tests/ValkeyTests/ValkeyConnectionTests.swift | 140 ++++++++++-- 3 files changed, 272 insertions(+), 68 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index fbd96378..623dbc08 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -44,6 +44,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { let channel: any Channel @usableFromInline let channelHandler: ValkeyChannelHandler + @usableFromInline let configuration: ValkeyConnectionConfiguration let isClosed: Atomic @@ -174,7 +175,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { defer { span?.end() } span?.updateAttributes { attributes in - self.applyCommonAttributes(to: &attributes, commandName: Command.name) + self.applyCommonAttributes(to: &attributes) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Command.name } #endif @@ -227,7 +229,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, any Error>) { - self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands))]) + self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands).string)]) + + #if DistributedTracingSupport + let span = self.tracer?.startSpan("Pipeline", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + let commands = Self.concatenateCommandNames(repeat each commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commands.string + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif // this currently allocates a promise for every command. We could collapse this down to one promise var promises: [EventLoopPromise] = [] @@ -263,7 +279,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public func transaction( _ commands: repeat each Command ) async throws -> sending (repeat Result<(each Command).Response, Error>) { - self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands))]) + self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands).string)]) + + #if DistributedTracingSupport + let span = self.tracer?.startSpan("MULTI", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + let commands = Self.concatenateCommandNames(repeat each commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commands.string + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif + // Construct encoded commands and promise array var encoder = ValkeyCommandEncoder() var promises: [EventLoopPromise] = [] @@ -276,36 +307,46 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { EXEC().encode(into: &encoder) promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) - return try await _execute( - buffer: encoder.buffer, - promises: promises, - valkeyPromises: promises.map { .nio($0) } - ) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), any Error> in - let responses: EXEC.Response - do { - let execFutureResult = promises.last!.futureResult - responses = try await execFutureResult.get().decode(as: EXEC.Response.self) - } catch let error as ValkeyClientError where error.errorCode == .commandError { - // we received an error while running the EXEC command. Extract queuing - // results and throw error - var results: [Result] = .init() - results.reserveCapacity(promises.count - 2) - for promise in promises[1..<(promises.count - 1)] { - results.append(await promise.futureResult._result()) + do { + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), any Error> in + let responses: EXEC.Response + do { + let execFutureResult = promises.last!.futureResult + responses = try await execFutureResult.get().decode(as: EXEC.Response.self) + } catch let error as ValkeyClientError where error.errorCode == .commandError { + // we received an error while running the EXEC command. Extract queuing + // results and throw error + var results: [Result] = .init() + results.reserveCapacity(promises.count - 2) + for promise in promises[1..<(promises.count - 1)] { + results.append(await promise.futureResult._result()) + } + return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error)) + } catch { + return .failure(error) } - return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error)) - } catch { - return .failure(error) - } - // If EXEC returned nil then transaction was aborted because a - // WATCHed variable changed - guard let responses else { - return .failure(ValkeyTransactionError.transactionAborted) + // If EXEC returned nil then transaction was aborted because a + // WATCHed variable changed + guard let responses else { + return .failure(ValkeyTransactionError.transactionAborted) + } + // We convert all the RESP errors in the response array from EXEC to Result.failure + // and attempt to convert the remaining to their respective Response types + return .success(responses.decodeExecResults()) + }.get() + } catch { + #if DistributedTracingSupport + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) } - // We convert all the RESP errors in the response array from EXEC to Result.failure - // and attempt to convert the remaining to their respective Response types - return .success(responses.decodeExecResults()) - }.get() + #endif + throw error + } } /// Pipeline a series of commands to Valkey connection @@ -320,11 +361,24 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { /// - Parameter commands: Collection of ValkeyCommands /// - Returns: Array holding the RESPToken responses of all the commands @inlinable - public func execute( - _ commands: some Collection - ) async -> [Result] { + public func execute( + _ commands: Commands + ) async -> [Result] where Commands.Element == any ValkeyCommand { self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(commands))]) + #if DistributedTracingSupport + let span = self.tracer?.startSpan("Pipeline", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Self.concatenateCommandNames(commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif + // this currently allocates a promise for every command. We could collapse this down to one promise var promises: [EventLoopPromise] = [] promises.reserveCapacity(commands.count) @@ -366,10 +420,24 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { /// - Returns: Array holding the RESPToken responses of all the commands /// - Throws: ValkeyTransactionError when EXEC aborts @inlinable - public func transaction( - _ commands: some Collection - ) async throws -> [Result] { + public func transaction( + _ commands: Commands + ) async throws -> [Result] where Commands.Element == any ValkeyCommand { self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(commands))]) + + #if DistributedTracingSupport + let span = self.tracer?.startSpan("MULTI", ofKind: .client) + defer { span?.end() } + + if !(span is NoOpTracer.Span) { + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes) + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Self.concatenateCommandNames(commands) + attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count + } + } + #endif + // Construct encoded commands and promise array var encoder = ValkeyCommandEncoder() var promises: [EventLoopPromise] = [] @@ -382,12 +450,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { EXEC().encode(into: &encoder) promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) - return try await _execute( - buffer: encoder.buffer, - promises: promises, - valkeyPromises: promises.map { .nio($0) }, - processResults: self._processTransactionPromises - ).get() + do { + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) }, + processResults: self._processTransactionPromises + ).get() + } catch { + #if DistributedTracingSupport + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) + } + #endif + throw error + } } /// Pipeline a series of commands to Valkey connection and precede each command with an ASKING @@ -571,8 +649,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { return commonAttributes } @usableFromInline - func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { - attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName + func applyCommonAttributes(to attributes: inout SpanAttributes) { attributes.merge(self.commonSpanAttributes) } #endif @@ -587,35 +664,51 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } /// Concatenate names from parameter pack of commands together - @usableFromInline + @inlinable static func concatenateCommandNames( _ commands: repeat each Command - ) -> String { + ) -> (string: String, count: Int) { + // get length of string so we only do one allocation + var stringLength = 0 + for command in repeat each commands { + stringLength += Swift.type(of: command).name.count + 1 + } var string: String = "" + string.reserveCapacity(stringLength - 1) + var count = 0 for command in repeat each commands { if count == 0 { string += "\(Swift.type(of: command).name)" - } else if count == 16 { - string += "..." - break } else { string += ",\(Swift.type(of: command).name)" } count += 1 } - return string + return (string, count) } /// Concatenate names from collection of command together - @usableFromInline + @inlinable static func concatenateCommandNames( _ commands: Commands ) -> String where Commands.Element == any ValkeyCommand { + // get length of string so we only do one allocation + var stringLength = 0 + var count = 0 + for command in commands { + if count == 16 { + stringLength += 2 // length of ellipsis - missing comma + break + } + stringLength += Swift.type(of: command).name.count + } var string: String = "" + string.reserveCapacity(stringLength) + guard let firstCommand = commands.first else { return "" } string = "\(Swift.type(of: firstCommand).name)" - var count = 1 + count = 1 for command in commands.dropFirst() { if count == 16 { string += "..." diff --git a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift index 1e09968c..df953ec7 100644 --- a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift +++ b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift @@ -179,6 +179,7 @@ public struct ValkeyTracingConfiguration: Sendable { /// Attribute names used in spans created by Valkey. public struct AttributeNames: Sendable { public var databaseOperationName: String = "db.operation.name" + public var databaseOperationBatchSize: String = "db.operation.batch.size" public var databaseSystemName: String = "db.system.name" public var networkPeerAddress: String = "network.peer.address" public var networkPeerPort: String = "network.peer.port" diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index b1c004eb..d02e7418 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -735,7 +735,7 @@ struct ConnectionTests { #expect(span.status?.code == .error) } - @Test(.disabled("Pipeline support not implemented yet")) + @Test @available(valkeySwift 1.0, *) func testPipelinedSameCommandsSpan() async throws { let tracer = InMemoryTracer() @@ -762,13 +762,13 @@ struct ConnectionTests { #expect(tracer.finishedSpans.count == 1) let span = try #require(tracer.finishedSpans.first) - #expect(span.operationName == "MULTI") + #expect(span.operationName == "Pipeline") #expect(span.kind == .client) #expect(span.errors.isEmpty) #expect( span.attributes == [ "db.system.name": "valkey", - "db.operation.name": "MULTI SET", + "db.operation.name": "SET,SET", "db.operation.batch.size": 2, "server.address": "127.0.0.1", "network.peer.address": "127.0.0.1", @@ -778,7 +778,7 @@ struct ConnectionTests { #expect(span.status == nil) } - @Test(.disabled("Pipeline support not implemented yet")) + @Test @available(valkeySwift 1.0, *) func testPipelinedDifferentCommandsSpan() async throws { let tracer = InMemoryTracer() @@ -803,6 +803,54 @@ struct ConnectionTests { #expect(try await results.1.get().map { String($0) } == "bar") + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) + #expect(span.operationName == "Pipeline") + #expect(span.kind == .client) + #expect(span.errors.isEmpty) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "SET,GET", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.status == nil) + } + + @Test + @available(valkeySwift 1.0, *) + func testTransactionSameCommandsSpan() async throws { + let tracer = InMemoryTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let results = connection.transaction( + SET("foo", value: "bar"), + SET("bar", value: "foo") + ) + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + buffer.writeImmutableBuffer(RESPToken(.command(["MULTI"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["SET", "foo", "bar"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["SET", "bar", "foo"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["EXEC"])).base) + #expect(outbound == buffer) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.array([.simpleString("OK"), .simpleString("OK")])).base) + + #expect(try await results.1.get().map { String($0) } == "OK") + #expect(tracer.finishedSpans.count == 1) let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "MULTI") @@ -811,7 +859,7 @@ struct ConnectionTests { #expect( span.attributes == [ "db.system.name": "valkey", - "db.operation.name": "MULTI", + "db.operation.name": "SET,SET", "db.operation.batch.size": 2, "server.address": "127.0.0.1", "network.peer.address": "127.0.0.1", @@ -821,9 +869,9 @@ struct ConnectionTests { #expect(span.status == nil) } - @Test(.disabled("Pipeline support not implemented yet")) + @Test @available(valkeySwift 1.0, *) - func testPipelinedCommandFailureSpan() async throws { + func testTransactionDifferentCommandsSpan() async throws { let tracer = InMemoryTracer() var config = ValkeyConnectionConfiguration() config.tracing.tracer = tracer @@ -833,27 +881,34 @@ struct ConnectionTests { let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) try await channel.processHello() - async let results = connection.execute( + async let results = connection.transaction( SET("foo", value: "bar"), GET("foo") ) - _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + buffer.writeImmutableBuffer(RESPToken(.command(["MULTI"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["SET", "foo", "bar"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["GET", "foo"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["EXEC"])).base) + #expect(outbound == buffer) try await channel.writeInbound(RESPToken(.simpleString("OK")).base) - try await channel.writeInbound(RESPToken(.simpleError("WRONGTYPE Error!")).base) - _ = await results + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.array([.simpleString("OK"), .bulkString("bar")])).base) + + #expect(try await results.1.get().map { String($0) } == "bar") #expect(tracer.finishedSpans.count == 1) let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "MULTI") #expect(span.kind == .client) - #expect(span.errors.count == 1) - let error = try #require(span.errors.first) - #expect(error.error as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) + #expect(span.errors.isEmpty) #expect( span.attributes == [ "db.system.name": "valkey", - "db.operation.name": "MULTI", + "db.operation.name": "SET,GET", "db.operation.batch.size": 2, "server.address": "127.0.0.1", "network.peer.address": "127.0.0.1", @@ -862,6 +917,61 @@ struct ConnectionTests { ) #expect(span.status == nil) } + + @Test + @available(valkeySwift 1.0, *) + func testTransactionCommandFailureSpan() async throws { + let tracer = InMemoryTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let results = connection.transaction( + SET("foo", value: "bar"), + GET("foo") + ) + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + buffer.writeImmutableBuffer(RESPToken(.command(["MULTI"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["SET", "foo", "bar"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["GET", "foo"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["EXEC"])).base) + #expect(outbound == buffer) + + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleError("EXECABORT")).base) + _ = try? await results + + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.errors.count == 1) + let error = try #require(span.errors.first?.error as? ValkeyTransactionError) + switch error { + case .transactionErrors(_, let execError as ValkeyClientError): + #expect(execError == ValkeyClientError(.commandError, message: "EXECABORT")) + default: + Issue.record() + } + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "SET,GET", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.status == .init(code: .error)) + } } #endif } From 4339d57a6178eafb90e2a7ea1ac9a292c39bffae Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 19 Dec 2025 09:11:10 +0000 Subject: [PATCH 2/2] PR comments Signed-off-by: Adam Fowler --- .../Valkey/Connection/ValkeyConnection.swift | 19 +++++--- Tests/ValkeyTests/ValkeyConnectionTests.swift | 45 +++++++++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 623dbc08..3460b269 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -670,17 +670,25 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { ) -> (string: String, count: Int) { // get length of string so we only do one allocation var stringLength = 0 + var count = 0 for command in repeat each commands { + if count == 16 { + stringLength += 3 // length of ellipsis + break + } stringLength += Swift.type(of: command).name.count + 1 + count += 1 } var string: String = "" string.reserveCapacity(stringLength - 1) - var count = 0 + count = 0 for command in repeat each commands { if count == 0 { string += "\(Swift.type(of: command).name)" - } else { + } else if count == 16 { + string += "..." + } else if count < 16 { string += ",\(Swift.type(of: command).name)" } count += 1 @@ -698,13 +706,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { var count = 0 for command in commands { if count == 16 { - stringLength += 2 // length of ellipsis - missing comma + stringLength += 3 // length of ellipsis break } - stringLength += Swift.type(of: command).name.count + stringLength += Swift.type(of: command).name.count + 1 + count += 1 } var string: String = "" - string.reserveCapacity(stringLength) + string.reserveCapacity(stringLength - 1) guard let firstCommand = commands.first else { return "" } string = "\(Swift.type(of: firstCommand).name)" diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index d02e7418..4796bd2a 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -821,6 +821,51 @@ struct ConnectionTests { #expect(span.status == nil) } + @Test + @available(valkeySwift 1.0, *) + func testPipelinedManyCommands() async throws { + let tracer = InMemoryTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + let commandCount = 24 + let commands: [any ValkeyCommand] = .init(repeating: GET("foo"), count: 24) + async let results = connection.execute(commands) + + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + for _ in 0..