Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/Abstractions/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,28 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
/// </param>
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
/// <inheritdoc cref="WaitForExternalEvent(string, CancellationToken)"/>
public async Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout)
public Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout)
{
return this.WaitForExternalEvent<T>(eventName, timeout, CancellationToken.None);
}

/// <param name="eventName">
/// The name of the event to wait for. Event names are case-insensitive. External event names can be reused any
/// number of times; they are not required to be unique.
/// </param>
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
/// <param name="cancellationToken">A <c>CancellationToken</c> to use to abort waiting for the event.</param>
/// <inheritdoc cref="WaitForExternalEvent(string, CancellationToken)"/>
public async Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout, CancellationToken cancellationToken)
{
// Timeouts are implemented using durable timers.
using CancellationTokenSource timerCts = new();
Task timeoutTask = this.CreateTimer(timeout, timerCts.Token);

using CancellationTokenSource eventCts = new();
// Create a linked cancellation token source from the external cancellation token.
// This allows us to cancel the event wait either when the external token is cancelled
// or when the timeout fires (by calling eventCts.Cancel()).
using CancellationTokenSource eventCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task<T> externalEventTask = this.WaitForExternalEvent<T>(eventName, eventCts.Token);

// Wait for either task to complete and then cancel the one that didn't.
Expand Down
118 changes: 118 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1292,4 +1292,122 @@ public async Task CatchingActivityExceptionsByType()
Assert.Equal("Success", results[2]);
Assert.Equal("Caught base Exception", results[3]);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_EventWins()
{
const string EventName = "TestEvent";
const string EventPayload = "test-payload";
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_EventWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromDays(7), cts.Token);
string result = await eventTask;
return result;
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);

// Send event - should complete the wait
await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal(EventPayload, result);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_CancellationWins()
{
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_CancellationWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();

// Create two event waiters with cancellation tokens
Task<string> event1Task = ctx.WaitForExternalEvent<string>("Event1", TimeSpan.FromDays(7), cts.Token);

using CancellationTokenSource cts2 = new();
Task<string> event2Task = ctx.WaitForExternalEvent<string>("Event2", TimeSpan.FromDays(7), cts2.Token);

// Wait for any to complete
Task winner = await Task.WhenAny(event1Task, event2Task);

// Cancel the other one
if (winner == event1Task)
{
cts2.Cancel();
return $"Event1: {await event1Task}";
}
else
{
cts.Cancel();
return $"Event2: {await event2Task}";
}
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);

// Send Event1 - should complete and cancel Event2
await server.Client.RaiseEventAsync(instanceId, "Event1", "first-event");

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal("Event1: first-event", result);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_TimeoutWins()
{
const string EventName = "TestEvent";
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_TimeoutWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromMilliseconds(500), cts.Token);

try
{
string result = await eventTask;
return $"Event: {result}";
}
catch (OperationCanceledException)
{
return "Timeout occurred";
}
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal("Timeout occurred", result);
}
}
Loading