Durable Functions
Durable Functions
Why Durable Functions?
When using Azure functions we usually deal with a strict stateless model. Each function execution is independent from other executions. This is a key factor why it is so easy to scale functions.
However using Azure functions we are limited:
- An Azure function has to complete within a certain time frame (e.g. max 10min for Consumption plan)
- We cannot model complex workflows or long-running operations.
Consider Durable Functions when
- You need to coordinate the execution of multiple functions and manage complex control flow and error handling.
- Your application requires stateful execution and the ability to pause, resume, or retry workflows.
- Human interaction, such as approvals or notifications, is an integral part of your application’s workflow.
Building blocks
- Orchestrator
- Orchestrates the function execution.
- Can call other functions. The output can be stored in local variables.
- Can be long-running. Lifespan of an orchestrator function can be from milliseconds to never-ending.
- Activity
- Are the units of work within a durable functions app.
- Are called by an orchestrator.
- External Event
- Are events triggered by any external client.
- Supports bindings and HTTP API.
Orchestrator sample
[FunctionName("HelloCities")]public static async Task<List<string>> Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ var outputs = new List<string>();
outputs.Add(await context .CallActivityAsync<string>("SayHello", "Tokyo")); outputs.Add(await context .CallActivityAsync<string>("SayHello", "Seattle")); outputs.Add(await context .CallActivityAsync<string>("SayHello", "London"));
return outputs;}
Activity sample
[FunctionName("SayHello")]public static string SayHello( [ActivityTrigger] IDurableActivityContext helloContext){ string name = helloContext.GetInput<string>(); return $"Hello {name}!";}
External event sample
[FunctionName("BudgetApproval")]public static async Task Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ bool approved = await context .WaitForExternalEvent<bool>("Approval"); if (approved) { // approval granted - do the approved action } else { // approval denied - send a notification }}
Raise an external event
via orchestration client binding:
[FunctionName("ApprovalQueueProcessor")]public static async Task Run( [QueueTrigger("approval-queue")] string instanceId, [DurableClient] IDurableOrchestrationClient client){ await client .RaiseEventAsync(instanceId, "Approval", true);}
Raise an external event (2)
via HTTP API:
POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXXContent-Type: application/json
"true"
Orchestrator Function execution
[FunctionName("HelloCities")]public static async Task<List<string>> Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ // Every time the orchestrator function is started or resumed // it will start from scratch at the beginning.
// Local variables will be created and assigned each time the function runs. var outputs = new List<string>(); var count = 0;
// When we await another function, the orchestrator stops the execution. // When the other function finishes, the orchestrator will run from the beginning // When it reaches the call of the executed function, the result will be used // instead of calling the function again // and the orchestrator function continues with the next steps. outputs.Add(await context .CallActivityAsync<string>("SayHello", "Tokyo")); count++;
return outputs;}
Orchestrator function constraints
- Orchestrator functions will be replayed multiple times from the beginning.
- Therefore orchestrator functions need to be deterministic - they need to produce the same result every time they are being called.
- We only can use deterministic APIs inside orchestrator functions to ensure this. A deterministic API is an API that always returns the same value given the same input.
- Things to avoid:
- DateTime.Now
- Guid.NewGuid()
- Random numbers
- Avoid using static variables and environment variables in orchestrator functions because their values can change over time, resulting in nondeterministic runtime behavior.
- Orchestrator code must never start any async operation except those defined by the orchestration trigger’s context object. For example, never use
Task.Run
,Task.Delay
, andHttpClient.SendAsync
.
Durable Function Patterns
The primary use case for Durable Functions is simplifying complex, stateful coordination requirements in serverless applications. We will have a look at typical application patterns that can benefit from Durable Functions.
Function chaining
A sequence of function has to execute in a specific order. The output of one function execution acts as input of the following function.
[FunctionName("Chaining")]public static async Task<object> Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ try { var x = await context.CallActivityAsync<object>( "F1", null); var y = await context.CallActivityAsync<object>( "F2", x); var z = await context.CallActivityAsync<object>( "F3", y); return await context.CallActivityAsync<object>( "F4", z); } catch (Exception) { // Error handling }}
Fan out / fan in
Execute multiple functions in parallel and then wait for all functions to finish.
[FunctionName("FanOutFanIn")]public static async Task Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel. object[] workBatch = await context.CallActivityAsync<object[]>("F1", null); for (int i = 0; i < workBatch.Length; i++) { Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]); parallelTasks.Add(task); }
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3. int sum = parallelTasks.Sum(t => t.Result); await context.CallActivityAsync("F3", sum);}
Async HTTP APIs
When dealing with HTTP requests we sometimes have the need to kick off a long-running operation. We often deal with this by having one endpoint for starting the operation and one endpoint for receiving the status or result.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWorkHTTP/1.1 202 AcceptedContent-Type: application/jsonLocation: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79...
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask /instances/b79baf67f717453ca9e86c5da21e03ec -iHTTP/1.1 202 AcceptedContent-Type: application/jsonLocation: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79...
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask /instances/b79baf67f717453ca9e86c5da21e03ec -iHTTP/1.1 200 OKContent-Length: 175Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Monitor
Recurring checks of other services.
[FunctionName("MonitorJobStatus")]public static async Task Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ int jobId = context.GetInput<int>(); int pollingInterval = GetPollingInterval(); DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime) { var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId); if (jobStatus == "Completed") { // Perform an action when a condition is met. await context.CallActivityAsync("SendAlert", jobId); break; }
// Orchestration sleeps until this time. var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval); await context.CreateTimer(nextCheck, CancellationToken.None); }}
Human interaction
Wait for events (often triggered by human interaction) that might occur much later.
[FunctionName("ApprovalWorkflow")]public static async Task Run( [OrchestrationTrigger] IDurableOrchestrationContext context){ await context.CallActivityAsync("RequestApproval", null); using (var timeoutCts = new CancellationTokenSource()) { DateTime dueTime = context.CurrentUtcDateTime.AddHours(72); Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent"); if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout)) { timeoutCts.Cancel(); await context.CallActivityAsync("ProcessApproval", approvalEvent.Result); } else { await context.CallActivityAsync("Escalate", null); } }}
Aggregator
Aggregate data over time in a single entity.
[FunctionName("Counter")]public static void Counter([EntityTrigger] IDurableEntityContext ctx){ int currentValue = ctx.GetState<int>(); switch (ctx.OperationName.ToLowerInvariant()) { case "add": int amount = ctx.GetInput<int>(); ctx.SetState(currentValue + amount); break; case "reset": ctx.SetState(0); break; case "get": ctx.Return(currentValue); break; }}
Alternative implementation:
public class Counter{ [JsonProperty("value")] public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))] public static Task Run( [EntityTrigger] IDurableEntityContext ctx) => ctx.DispatchAsync<Counter>();}
[FunctionName("EventHubTriggerCSharp")]public static async Task Run( [EventHubTrigger("device-sensor-events")] EventData eventData, [DurableClient] IDurableEntityClient entityClient){ var metricType = (string)eventData.Properties["metric"]; var delta = BitConverter.ToInt32( eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand. var entityId = new EntityId("Counter", metricType); await entityClient.SignalEntityAsync(entityId, "add", delta);}