Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ Current package versions:
- Support Redis 8.4 CAS/CAD operations (`DIGEST`, and the `IFEQ`, `IFNE`, `IFDEQ`, `IFDNE` modifiers on `SET` / `DEL`)
via the new `ValueCondition` abstraction, and use CAS/CAD operations for `Lock*` APIs when possible ([#2978 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2978))
- **note**: overload resolution for `StringSet[Async]` may be impacted in niche cases, requiring trivial build changes (there are no runtime-breaking changes such as missing methods)
- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))
- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))
- Support `MSETEX` (Redis 8.4.0) for multi-key operations with expiration ([#2977 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2977))
- Support `NOMKSTREAM` option in `StreamAdd` methods via `createStream` parameter (requires Redis 6.2.0+)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in ## unreleased or similar


## 2.9.32

Expand Down
32 changes: 32 additions & 0 deletions docs/Streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ You also have the option to override the auto-generated message ID by passing yo
db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
```

Conditional Stream Creation with NOMKSTREAM
---

By default, `StreamAdd` automatically creates the stream if it doesn't exist. Starting with Redis 6.2.0, you can prevent automatic stream creation using the `createStream` parameter:

```csharp
// This will return null if the stream doesn't exist
var messageId = db.StreamAdd(
"mystream",
"field",
"value",
createStream: false);

if (messageId.IsNull)
{
Console.WriteLine("Stream does not exist, message was not added.");
}
else
{
Console.WriteLine($"Message added with ID: {messageId}");
}
```

**Use cases**:
- **Producer-consumer scenarios**: Only add messages if a consumer has registered (by creating the stream or consumer group)
- **Prevent typos**: Avoid accidentally creating streams with misspelled keys
- **Conditional writes**: Only write to pre-existing streams

**Requirements**:
- Redis 6.2.0 or higher
- When `createStream: false` and the stream doesn't exist, the method returns `RedisValue.Null`

Reading from Streams
===

Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2655,13 +2655,14 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="createStream">When false, the stream will not be created if it does not exist, and the command returns null. When true (default), the stream is created if it does not exist. Requires Redis 6.2.0+.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
Expand All @@ -2674,13 +2675,14 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="createStream">When false, the stream will not be created if it does not exist, and the command returns null. When true (default), the stream is created if it does not exist. Requires Redis 6.2.0+.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
Expand Down
8 changes: 4 additions & 4 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,11 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);

#pragma warning disable RS0026 // similar overloads
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, long?, bool, bool, long?, StreamTrimMode, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, bool, long?, StreamTrimMode, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <inheritdoc cref="IDatabase.StreamAutoClaim(RedisKey, RedisValue, RedisValue, long, RedisValue, int?, CommandFlags)"/>
Expand Down
8 changes: 4 additions & 4 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,11 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, Red
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);

public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);

public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);

public Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,11 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);

public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);

public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);

public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
Expand Down
Loading
Loading