Skip to content

Durable Functions

Durable Functions


Why Durable Functions?

When using Azure functions we usually deal with a strict stateless model. Each function execution is independent from other executions. This is a key factor why it is so easy to scale functions.

However using Azure functions we are limited:

  • An Azure function has to complete within a certain time frame (e.g. max 10min for Consumption plan)
  • We cannot model complex workflows or long-running operations.

Consider Durable Functions when

  • You need to coordinate the execution of multiple functions and manage complex control flow and error handling.
  • Your application requires stateful execution and the ability to pause, resume, or retry workflows.
  • Human interaction, such as approvals or notifications, is an integral part of your application’s workflow.

Building blocks

  • Orchestrator
    • Orchestrates the function execution.
    • Can call other functions. The output can be stored in local variables.
    • Can be long-running. Lifespan of an orchestrator function can be from milliseconds to never-ending.
  • Activity
    • Are the units of work within a durable functions app.
    • Are called by an orchestrator.
  • External Event
    • Are events triggered by any external client.
    • Supports bindings and HTTP API.

Orchestrator sample

[FunctionName("HelloCities")]
public static async Task<List<string>> Run(
[OrchestrationTrigger]
IDurableOrchestrationContext context)
{
var outputs = new List<string>();
outputs.Add(await context
.CallActivityAsync<string>("SayHello", "Tokyo"));
outputs.Add(await context
.CallActivityAsync<string>("SayHello", "Seattle"));
outputs.Add(await context
.CallActivityAsync<string>("SayHello", "London"));
return outputs;
}

Activity sample

[FunctionName("SayHello")]
public static string SayHello(
[ActivityTrigger] IDurableActivityContext helloContext)
{
string name = helloContext.GetInput<string>();
return $"Hello {name}!";
}

External event sample

[FunctionName("BudgetApproval")]
public static async Task Run(
[OrchestrationTrigger]
IDurableOrchestrationContext context)
{
bool approved = await context
.WaitForExternalEvent<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
}

Raise an external event

via orchestration client binding:

[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
[QueueTrigger("approval-queue")] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
await client
.RaiseEventAsync(instanceId, "Approval", true);
}

Raise an external event (2)

via HTTP API:

POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json
"true"

Orchestrator Function execution

[FunctionName("HelloCities")]
public static async Task<List<string>> Run(
[OrchestrationTrigger]
IDurableOrchestrationContext context)
{
// Every time the orchestrator function is started or resumed
// it will start from scratch at the beginning.
// Local variables will be created and assigned each time the function runs.
var outputs = new List<string>();
var count = 0;
// When we await another function, the orchestrator stops the execution.
// When the other function finishes, the orchestrator will run from the beginning
// When it reaches the call of the executed function, the result will be used
// instead of calling the function again
// and the orchestrator function continues with the next steps.
outputs.Add(await context
.CallActivityAsync<string>("SayHello", "Tokyo"));
count++;
return outputs;
}

Orchestrator function constraints

  • Orchestrator functions will be replayed multiple times from the beginning.
  • Therefore orchestrator functions need to be deterministic - they need to produce the same result every time they are being called.
  • We only can use deterministic APIs inside orchestrator functions to ensure this. A deterministic API is an API that always returns the same value given the same input.
  • Things to avoid:
    • DateTime.Now
    • Guid.NewGuid()
    • Random numbers
  • Avoid using static variables and environment variables in orchestrator functions because their values can change over time, resulting in nondeterministic runtime behavior.
  • Orchestrator code must never start any async operation except those defined by the orchestration trigger’s context object. For example, never use Task.RunTask.Delay, and HttpClient.SendAsync.

Durable Function Patterns

The primary use case for Durable Functions is simplifying complex, stateful coordination requirements in serverless applications. We will have a look at typical application patterns that can benefit from Durable Functions.


Function chaining

function-chaining.png

A sequence of function has to execute in a specific order. The output of one function execution acts as input of the following function.


[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger]
IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>(
"F1", null);
var y = await context.CallActivityAsync<object>(
"F2", x);
var z = await context.CallActivityAsync<object>(
"F3", y);
return await context.CallActivityAsync<object>(
"F4", z);
}
catch (Exception)
{
// Error handling
}
}

Fan out / fan in

fan-out-fan-in.png

Execute multiple functions in parallel and then wait for all functions to finish.


[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}

Async HTTP APIs

async-http-api.png

When dealing with HTTP requests we sometimes have the need to kick off a long-running operation. We often deal with this by having one endpoint for starting the operation and one endpoint for receiving the status or result.


Terminal window
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79...
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask
/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79...
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask
/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Monitor

monitor.png

Recurring checks of other services.


[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
}

Human interaction

approval.png Wait for events (often triggered by human interaction) that might occur much later.


[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}

Aggregator

aggregator.png

Aggregate data over time in a single entity.


[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}

Alternative implementation:

public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) =>
this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run(
[EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(
eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}