Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 155 additions & 53 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
let channel: any Channel
@usableFromInline
let channelHandler: ValkeyChannelHandler
@usableFromInline
let configuration: ValkeyConnectionConfiguration
let isClosed: Atomic<Bool>

Expand Down Expand Up @@ -174,7 +175,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
defer { span?.end() }

span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
self.applyCommonAttributes(to: &attributes)
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Command.name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add databaseNumber here as well for better tracing ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not an obvious Otel parameter to use for database number. See https://opentelemetry.io/docs/specs/semconv/registry/attributes/db/

}
#endif

Expand Down Expand Up @@ -227,7 +229,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
public func execute<each Command: ValkeyCommand>(
_ commands: repeat each Command
) async -> sending (repeat Result<(each Command).Response, any Error>) {
self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands))])
self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands).string)])

#if DistributedTracingSupport
let span = self.tracer?.startSpan("Pipeline", ofKind: .client)
defer { span?.end() }

if !(span is NoOpTracer.Span) {
span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes)
let commands = Self.concatenateCommandNames(repeat each commands)
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commands.string
attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count
}
}
#endif

// this currently allocates a promise for every command. We could collapse this down to one promise
var promises: [EventLoopPromise<RESPToken>] = []
Expand Down Expand Up @@ -263,7 +279,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
public func transaction<each Command: ValkeyCommand>(
_ commands: repeat each Command
) async throws -> sending (repeat Result<(each Command).Response, Error>) {
self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands))])
self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(repeat each commands).string)])

#if DistributedTracingSupport
let span = self.tracer?.startSpan("MULTI", ofKind: .client)
defer { span?.end() }

if !(span is NoOpTracer.Span) {
span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes)
let commands = Self.concatenateCommandNames(repeat each commands)
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commands.string
attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count
}
}
#endif

// Construct encoded commands and promise array
var encoder = ValkeyCommandEncoder()
var promises: [EventLoopPromise<RESPToken>] = []
Expand All @@ -276,36 +307,46 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
EXEC().encode(into: &encoder)
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))

return try await _execute(
buffer: encoder.buffer,
promises: promises,
valkeyPromises: promises.map { .nio($0) }
) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), any Error> in
let responses: EXEC.Response
do {
let execFutureResult = promises.last!.futureResult
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
} catch let error as ValkeyClientError where error.errorCode == .commandError {
// we received an error while running the EXEC command. Extract queuing
// results and throw error
var results: [Result<RESPToken, Error>] = .init()
results.reserveCapacity(promises.count - 2)
for promise in promises[1..<(promises.count - 1)] {
results.append(await promise.futureResult._result())
do {
return try await _execute(
buffer: encoder.buffer,
promises: promises,
valkeyPromises: promises.map { .nio($0) }
) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), any Error> in
let responses: EXEC.Response
do {
let execFutureResult = promises.last!.futureResult
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
} catch let error as ValkeyClientError where error.errorCode == .commandError {
// we received an error while running the EXEC command. Extract queuing
// results and throw error
var results: [Result<RESPToken, Error>] = .init()
results.reserveCapacity(promises.count - 2)
for promise in promises[1..<(promises.count - 1)] {
results.append(await promise.futureResult._result())
}
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
} catch {
return .failure(error)
}
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
} catch {
return .failure(error)
}
// If EXEC returned nil then transaction was aborted because a
// WATCHed variable changed
guard let responses else {
return .failure(ValkeyTransactionError.transactionAborted)
// If EXEC returned nil then transaction was aborted because a
// WATCHed variable changed
guard let responses else {
return .failure(ValkeyTransactionError.transactionAborted)
}
// We convert all the RESP errors in the response array from EXEC to Result.failure
// and attempt to convert the remaining to their respective Response types
return .success(responses.decodeExecResults())
}.get()
} catch {
#if DistributedTracingSupport
if let span {
span.recordError(error)
span.setStatus(SpanStatus(code: .error))
}
// We convert all the RESP errors in the response array from EXEC to Result.failure
// and attempt to convert the remaining to their respective Response types
return .success(responses.decodeExecResults())
}.get()
#endif
throw error
}
}

/// Pipeline a series of commands to Valkey connection
Expand All @@ -320,11 +361,24 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
/// - Parameter commands: Collection of ValkeyCommands
/// - Returns: Array holding the RESPToken responses of all the commands
@inlinable
public func execute(
_ commands: some Collection<any ValkeyCommand>
) async -> [Result<RESPToken, any Error>] {
public func execute<Commands: Collection & Sendable>(
_ commands: Commands
) async -> [Result<RESPToken, any Error>] where Commands.Element == any ValkeyCommand {
self.logger.trace("execute", metadata: ["commands": .string(Self.concatenateCommandNames(commands))])

#if DistributedTracingSupport
let span = self.tracer?.startSpan("Pipeline", ofKind: .client)
defer { span?.end() }

if !(span is NoOpTracer.Span) {
span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes)
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Self.concatenateCommandNames(commands)
attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count
}
}
#endif

// this currently allocates a promise for every command. We could collapse this down to one promise
var promises: [EventLoopPromise<RESPToken>] = []
promises.reserveCapacity(commands.count)
Expand Down Expand Up @@ -366,10 +420,24 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
/// - Returns: Array holding the RESPToken responses of all the commands
/// - Throws: ValkeyTransactionError when EXEC aborts
@inlinable
public func transaction(
_ commands: some Collection<any ValkeyCommand>
) async throws -> [Result<RESPToken, Error>] {
public func transaction<Commands: Collection & Sendable>(
_ commands: Commands
) async throws -> [Result<RESPToken, Error>] where Commands.Element == any ValkeyCommand {
self.logger.trace("transaction", metadata: ["commands": .string(Self.concatenateCommandNames(commands))])

#if DistributedTracingSupport
let span = self.tracer?.startSpan("MULTI", ofKind: .client)
defer { span?.end() }

if !(span is NoOpTracer.Span) {
span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes)
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = Self.concatenateCommandNames(commands)
attributes[self.configuration.tracing.attributeNames.databaseOperationBatchSize] = commands.count
}
}
#endif

// Construct encoded commands and promise array
var encoder = ValkeyCommandEncoder()
var promises: [EventLoopPromise<RESPToken>] = []
Expand All @@ -382,12 +450,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
EXEC().encode(into: &encoder)
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))

return try await _execute(
buffer: encoder.buffer,
promises: promises,
valkeyPromises: promises.map { .nio($0) },
processResults: self._processTransactionPromises
).get()
do {
return try await _execute(
buffer: encoder.buffer,
promises: promises,
valkeyPromises: promises.map { .nio($0) },
processResults: self._processTransactionPromises
).get()
} catch {
#if DistributedTracingSupport
if let span {
span.recordError(error)
span.setStatus(SpanStatus(code: .error))
}
#endif
throw error
}
}

/// Pipeline a series of commands to Valkey connection and precede each command with an ASKING
Expand Down Expand Up @@ -571,8 +649,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
return commonAttributes
}
@usableFromInline
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName
func applyCommonAttributes(to attributes: inout SpanAttributes) {
attributes.merge(self.commonSpanAttributes)
}
#endif
Expand All @@ -587,35 +664,60 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
}

/// Concatenate names from parameter pack of commands together
@usableFromInline
@inlinable
static func concatenateCommandNames<each Command: ValkeyCommand>(
_ commands: repeat each Command
) -> String {
var string: String = ""
) -> (string: String, count: Int) {
// get length of string so we only do one allocation
var stringLength = 0
var count = 0
for command in repeat each commands {
if count == 16 {
stringLength += 3 // length of ellipsis
break
}
stringLength += Swift.type(of: command).name.count + 1
count += 1
}
var string: String = ""
string.reserveCapacity(stringLength - 1)

count = 0
for command in repeat each commands {
if count == 0 {
string += "\(Swift.type(of: command).name)"
} else if count == 16 {
string += "..."
break
} else {
} else if count < 16 {
string += ",\(Swift.type(of: command).name)"
}
count += 1
}
return string
return (string, count)
}

/// Concatenate names from collection of command together
@usableFromInline
@inlinable
static func concatenateCommandNames<Commands: Collection>(
_ commands: Commands
) -> String where Commands.Element == any ValkeyCommand {
// get length of string so we only do one allocation
var stringLength = 0
var count = 0
for command in commands {
if count == 16 {
stringLength += 3 // length of ellipsis
break
}
stringLength += Swift.type(of: command).name.count + 1
count += 1
}
var string: String = ""
string.reserveCapacity(stringLength - 1)

guard let firstCommand = commands.first else { return "" }
string = "\(Swift.type(of: firstCommand).name)"
var count = 1
count = 1
for command in commands.dropFirst() {
if count == 16 {
string += "..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public struct ValkeyTracingConfiguration: Sendable {
/// Attribute names used in spans created by Valkey.
public struct AttributeNames: Sendable {
public var databaseOperationName: String = "db.operation.name"
public var databaseOperationBatchSize: String = "db.operation.batch.size"
public var databaseSystemName: String = "db.system.name"
public var networkPeerAddress: String = "network.peer.address"
public var networkPeerPort: String = "network.peer.port"
Expand Down
Loading
Loading