Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/AsyncTimeouts.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,18 @@ using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); // or mu
cts.CancelAfter(timeout);
await database.StringSetAsync("key", "value").WaitAsync(cts.Token);
var value = await database.StringGetAsync("key").WaitAsync(cts.Token);
``````
```

### Cancelling keys enumeration

Keys being enumerated (via `SCAN`) can *also* be cancelled, using the inbuilt `.WithCancellation(...)` method:

```csharp
CancellationToken token = ...; // for example, from HttpContext.RequestAborted
await foreach (var key in server.KeysAsync(pattern: "*foo*").WithCancellation(token))
{
...
}
```

To use a timeout instead, you can use the `CancellationTokenSource` approach shown above.
6 changes: 3 additions & 3 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ Current package versions:

## Unreleased

- (none)
- Support async cancellation of `SCAN` enumeration ([#2911 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2911))
- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842))
- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912))

## 2.8.47

- Add support for new `BITOP` operations in CE 8.2 ([#2900 by atakavci](https://github.com/StackExchange/StackExchange.Redis/pull/2900))
- Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906))
- Docs: added [guidance on async timeouts](https://stackexchange.github.io/StackExchange.Redis/AsyncTimeouts) ([#2910 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2910))
- Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909))
- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842))
- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912))

## 2.8.41

Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/CursorEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private bool SimpleNext()
{
if (_pageOffset + 1 < _pageCount)
{
cancellationToken.ThrowIfCancellationRequested();
_pageOffset++;
return true;
}
Expand Down Expand Up @@ -274,7 +275,7 @@ private async ValueTask<bool> AwaitedNextAsync(bool isInitial)
ScanResult scanResult;
try
{
scanResult = await pending.ForAwait();
scanResult = await pending.WaitAsync(cancellationToken).ForAwait();
}
catch (Exception ex)
{
Expand Down
38 changes: 38 additions & 0 deletions src/StackExchange.Redis/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,44 @@ internal static Task<T> ObserveErrors<T>(this Task<T> task)
return task;
}

#if !NET6_0_OR_GREATER
// suboptimal polyfill version of the .NET 6+ API, but reasonable for light use
internal static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancellationToken)
{
if (task.IsCompleted || !cancellationToken.CanBeCanceled) return task;
return Wrap(task, cancellationToken);

static async Task<T> Wrap(Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskSourceWithToken<T>(cancellationToken);
using var reg = cancellationToken.Register(
static state => ((TaskSourceWithToken<T>)state!).Cancel(), tcs);
_ = task.ContinueWith(
static (t, state) =>
{
var tcs = (TaskSourceWithToken<T>)state!;
if (t.IsCanceled) tcs.TrySetCanceled();
else if (t.IsFaulted) tcs.TrySetException(t.Exception!);
else tcs.TrySetResult(t.Result);
},
tcs);
return await tcs.Task;
}
}

// the point of this type is to combine TCS and CT so that we can use a static
// registration via Register
private sealed class TaskSourceWithToken<T> : TaskCompletionSource<T>
{
public TaskSourceWithToken(CancellationToken cancellationToken)
=> _cancellationToken = cancellationToken;

private readonly CancellationToken _cancellationToken;

public void Cancel() => TrySetCanceled(_cancellationToken);
}
#endif

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
39 changes: 39 additions & 0 deletions tests/StackExchange.Redis.Tests/CancellationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public async Task WithCancellation_ValidToken_OperationSucceeds()

private static void Pause(IDatabase db) => db.Execute("client", ["pause", ConnectionPauseMilliseconds], CommandFlags.FireAndForget);

private void Pause(IServer server)
{
server.Execute("client", new object[] { "pause", ConnectionPauseMilliseconds }, CommandFlags.FireAndForget);
}

[Fact]
public async Task WithTimeout_ShortTimeout_Async_ThrowsOperationCanceledException()
{
Expand Down Expand Up @@ -147,4 +152,38 @@ public async Task CancellationDuringOperation_Async_CancelsGracefully(CancelStra
Assert.Equal(cts.Token, oce.CancellationToken);
}
}

[Fact]
public async Task ScanCancellable()
{
using var conn = Create();
var db = conn.GetDatabase();
var server = conn.GetServer(conn.GetEndPoints()[0]);

using var cts = new CancellationTokenSource();

var watch = Stopwatch.StartNew();
Pause(server);
try
{
db.StringSet(Me(), "value", TimeSpan.FromMinutes(5), flags: CommandFlags.FireAndForget);
await using var iter = server.KeysAsync(pageSize: 1000).WithCancellation(cts.Token).GetAsyncEnumerator();
var pending = iter.MoveNextAsync();
Assert.False(cts.Token.IsCancellationRequested);
cts.CancelAfter(ShortDelayMilliseconds); // start this *after* we've got past the initial check
while (await pending)
{
pending = iter.MoveNextAsync();
}
Assert.Fail($"{ExpectedCancel}: {watch.ElapsedMilliseconds}ms");
}
catch (OperationCanceledException oce)
{
var taken = watch.ElapsedMilliseconds;
// Expected if cancellation happens during operation
Log($"Cancelled after {taken}ms");
Assert.True(taken < ConnectionPauseMilliseconds / 2, "Should have cancelled much sooner");
Assert.Equal(cts.Token, oce.CancellationToken);
}
}
}
Loading