admin管理员组

文章数量:1125049

I created a azure durable function in 6 here its code snippet

I'm facing two issues

  1. Orchestrator function wont complete despite all activity function execution completes without any exception
  2. sometime activity get executed multiple time

I observed console logs and found log.LogInformation("GenerateScheduledReports: All report generation tasks completed."); never executed nor its exception block.

Code:

[FunctionName("TimerTrigger")]
public static async Task TimerTrigger(
    [TimerTrigger("%ReportSchedule%", RunOnStartup = true)] TimerInfo timer,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = "OrchestratorFunction";

    // Check the status of the running orchestration
    var existingInstance = await starter.GetStatusAsync(instanceId);

    if (existingInstance != null && existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        TimeSpan runningTime = DateTime.Now - existingInstance.CreatedTime;

        if (runningTime.TotalMinutes > 30)
        {
            log.LogWarning($"Orchestrator with ID = '{instanceId}' is running for {runningTime.TotalMinutes} minutes. Terminating...");
            await starter.TerminateAsync(instanceId, "Orchestration running too long. Terminated.");
            log.LogInformation($"Orchestrator with ID = '{instanceId}' has been terminated.");
        }
        else
        {
            log.LogInformation($"Orchestrator with ID = '{instanceId}' is running for {runningTime.TotalMinutes} minutes. Letting it continue.");
            return; // Skip starting a new instance
        }
    }

    log.LogInformation("No active or acceptable orchestrator instance found. Starting a new one...");
    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    await starter.StartNewAsync("OrchestratorFunction", instanceId);
}

[FunctionName("OrchestratorFunction")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
    ILogger log)
{
    try
    {
        if (!context.IsReplaying)
        {
            log.LogInformation("OrchestratorFunction: START");
            
            await GenerateScheduledReports(context, log);

            log.LogInformation("OrchestratorFunction: END");
        }
    }
    catch (Exception ex)
    {
        log.LogError($"OrchestratorFunction: EXCEPTION : {ex.ToString()}");
    }
}

private static async Task GenerateScheduledReports(IDurableOrchestrationContext context, ILogger log)
{
    log.LogInformation("GenerateScheduledReports: START");
    List<Task> _listTasks = new List<Task>();
    var reportGenerationList = ReportSettingService.GetClientListForReportGeneration(log);

    foreach (var reportGeneration in reportGenerationList)
    {
        if (reportGeneration.ReportType == Constant.RECON_REPORT_TYPE_STRING)
        {
            _listTasks.Add(context.CallActivityAsync("GenerateReconciliationReport", reportGeneration));
        }
        else if (reportGeneration.ReportType == Constant.QTRREBATE_REPORT_TYPE_STRING)
        {
            _listTasks.Add(context.CallActivityAsync("GenerateQuarterlyRebateReport", reportGeneration));
        }
    }
    try
    {
        if (_listTasks.Any())
        {
            log.LogInformation("GenerateScheduledReports: Waiting for all report generation tasks to complete...");
            await Task.WhenAll(_listTasks);
            log.LogInformation("GenerateScheduledReports: All report generation tasks completed.");
        }
        else
        {
            log.LogWarning("GenerateScheduledReports: No tasks to process in GenerateReports.");
        }
    }
    catch (Exception ex)
    {
        log.LogError($"GenerateScheduledReports: EXCEPTION: {ex.Message}");
        //throw; // Re-throw to ensure orchestration logs the failure
    }
    log.LogInformation("GenerateScheduledReports: END");
}

#region -- Common activity functions used in scheduled and retry orchestrator functions --
[FunctionName("GenerateReconReport")]
public static async Task GenerateReconReport([ActivityTrigger] ReportGenerationModel reportGeneration, ExecutionContext executionContext, ILogger log)
{
    log.LogInformation($"Client [{reportGeneration.ClientId}-{reportGeneration.ReportType}] Step 1: Start Date {reportGeneration.InvoiceStartDate.GetValueOrDefault().ToString("MM/dd/yyyy")}, End Date {reportGeneration.InvoiceEndDate.GetValueOrDefault().ToString("MM/dd/yyyy")}");
    var reconReport = new ReconReport();
    await reconReport.GenerateReport(reportGeneration,
        executionContext,
        log);
}

[FunctionName("GenerateQuarterlyReport")]
public static async Task GenerateQuarterlyRebateReport([ActivityTrigger] ReportGenerationModel reportGeneration, ExecutionContext executionContext, ILogger log)
{
    log.LogInformation($"Client [{reportGeneration.ClientId}-{reportGeneration.ReportType}] Step 1: - Start Date {reportGeneration.InvoiceStartDate.GetValueOrDefault().ToString("MM/dd/yyyy")}, End Date {reportGeneration.InvoiceEndDate.GetValueOrDefault().ToString("MM/dd/yyyy")}");

    var quarterlyReport = new QuarterlyRebateStatementGenerateReport();
    await quarterlyReport.GenerateReport(reportGeneration,
                    executionContext,
                    log);
}
#endregion

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      },
      "enableLiveMetricsFilters": true
    }
  },
  "durableTask": {
    "maxConcurrentActivityFunctions": 30, // Concurrency not relevant for sequential execution
    "maxConcurrentOrchestratorFunctions": 1, // Only one orchestrator runs at a time
    "controlQueueBatchSize": 50,
    "taskHub": "DurableTaskHub",
    "storageProvider": {
      "connectionStringName": "BlobConnectionString", // Use the correct key name here
      "maxQueuePollingIntervalMs": 2000
    }
  },
  "functionTimeout": "02:00:00",
  "extensions": {
    "durableTask": {
      "hubName": "MyHubName"
    }
  }
}

I created a azure durable function in .net6 here its code snippet

I'm facing two issues

  1. Orchestrator function wont complete despite all activity function execution completes without any exception
  2. sometime activity get executed multiple time

I observed console logs and found log.LogInformation("GenerateScheduledReports: All report generation tasks completed."); never executed nor its exception block.

Code:

[FunctionName("TimerTrigger")]
public static async Task TimerTrigger(
    [TimerTrigger("%ReportSchedule%", RunOnStartup = true)] TimerInfo timer,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = "OrchestratorFunction";

    // Check the status of the running orchestration
    var existingInstance = await starter.GetStatusAsync(instanceId);

    if (existingInstance != null && existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        TimeSpan runningTime = DateTime.Now - existingInstance.CreatedTime;

        if (runningTime.TotalMinutes > 30)
        {
            log.LogWarning($"Orchestrator with ID = '{instanceId}' is running for {runningTime.TotalMinutes} minutes. Terminating...");
            await starter.TerminateAsync(instanceId, "Orchestration running too long. Terminated.");
            log.LogInformation($"Orchestrator with ID = '{instanceId}' has been terminated.");
        }
        else
        {
            log.LogInformation($"Orchestrator with ID = '{instanceId}' is running for {runningTime.TotalMinutes} minutes. Letting it continue.");
            return; // Skip starting a new instance
        }
    }

    log.LogInformation("No active or acceptable orchestrator instance found. Starting a new one...");
    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    await starter.StartNewAsync("OrchestratorFunction", instanceId);
}

[FunctionName("OrchestratorFunction")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
    ILogger log)
{
    try
    {
        if (!context.IsReplaying)
        {
            log.LogInformation("OrchestratorFunction: START");
            
            await GenerateScheduledReports(context, log);

            log.LogInformation("OrchestratorFunction: END");
        }
    }
    catch (Exception ex)
    {
        log.LogError($"OrchestratorFunction: EXCEPTION : {ex.ToString()}");
    }
}

private static async Task GenerateScheduledReports(IDurableOrchestrationContext context, ILogger log)
{
    log.LogInformation("GenerateScheduledReports: START");
    List<Task> _listTasks = new List<Task>();
    var reportGenerationList = ReportSettingService.GetClientListForReportGeneration(log);

    foreach (var reportGeneration in reportGenerationList)
    {
        if (reportGeneration.ReportType == Constant.RECON_REPORT_TYPE_STRING)
        {
            _listTasks.Add(context.CallActivityAsync("GenerateReconciliationReport", reportGeneration));
        }
        else if (reportGeneration.ReportType == Constant.QTRREBATE_REPORT_TYPE_STRING)
        {
            _listTasks.Add(context.CallActivityAsync("GenerateQuarterlyRebateReport", reportGeneration));
        }
    }
    try
    {
        if (_listTasks.Any())
        {
            log.LogInformation("GenerateScheduledReports: Waiting for all report generation tasks to complete...");
            await Task.WhenAll(_listTasks);
            log.LogInformation("GenerateScheduledReports: All report generation tasks completed.");
        }
        else
        {
            log.LogWarning("GenerateScheduledReports: No tasks to process in GenerateReports.");
        }
    }
    catch (Exception ex)
    {
        log.LogError($"GenerateScheduledReports: EXCEPTION: {ex.Message}");
        //throw; // Re-throw to ensure orchestration logs the failure
    }
    log.LogInformation("GenerateScheduledReports: END");
}

#region -- Common activity functions used in scheduled and retry orchestrator functions --
[FunctionName("GenerateReconReport")]
public static async Task GenerateReconReport([ActivityTrigger] ReportGenerationModel reportGeneration, ExecutionContext executionContext, ILogger log)
{
    log.LogInformation($"Client [{reportGeneration.ClientId}-{reportGeneration.ReportType}] Step 1: Start Date {reportGeneration.InvoiceStartDate.GetValueOrDefault().ToString("MM/dd/yyyy")}, End Date {reportGeneration.InvoiceEndDate.GetValueOrDefault().ToString("MM/dd/yyyy")}");
    var reconReport = new ReconReport();
    await reconReport.GenerateReport(reportGeneration,
        executionContext,
        log);
}

[FunctionName("GenerateQuarterlyReport")]
public static async Task GenerateQuarterlyRebateReport([ActivityTrigger] ReportGenerationModel reportGeneration, ExecutionContext executionContext, ILogger log)
{
    log.LogInformation($"Client [{reportGeneration.ClientId}-{reportGeneration.ReportType}] Step 1: - Start Date {reportGeneration.InvoiceStartDate.GetValueOrDefault().ToString("MM/dd/yyyy")}, End Date {reportGeneration.InvoiceEndDate.GetValueOrDefault().ToString("MM/dd/yyyy")}");

    var quarterlyReport = new QuarterlyRebateStatementGenerateReport();
    await quarterlyReport.GenerateReport(reportGeneration,
                    executionContext,
                    log);
}
#endregion

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      },
      "enableLiveMetricsFilters": true
    }
  },
  "durableTask": {
    "maxConcurrentActivityFunctions": 30, // Concurrency not relevant for sequential execution
    "maxConcurrentOrchestratorFunctions": 1, // Only one orchestrator runs at a time
    "controlQueueBatchSize": 50,
    "taskHub": "DurableTaskHub",
    "storageProvider": {
      "connectionStringName": "BlobConnectionString", // Use the correct key name here
      "maxQueuePollingIntervalMs": 2000
    }
  },
  "functionTimeout": "02:00:00",
  "extensions": {
    "durableTask": {
      "hubName": "MyHubName"
    }
  }
}
Share Improve this question asked 2 days ago Dark SDark S 3302 silver badges18 bronze badges 5
  • Can you share code of QuarterlyRebateStatementGenerateReport – Ikhtesam Afrin Commented 2 days ago
  • that code part executed completely as I added log at the end and i observed its get printed in console logs – Dark S Commented 2 days ago
  • Sounds like deadlock. What are the last outputs you are getting? (BTW: Microsoft discourages using interpolated strings for logging message templates.) – Fildor Commented 2 days ago
  • @Fildor it keep in running stage after all activities get completed and i crossed check results but the last logs which i set after try catch of orchestrator log.LogInformation("GenerateScheduledReports: END"); is not printed, i dont know if orchestrator keep waiting for all parallel activities task even after all activities completed. – Dark S Commented 2 days ago
  • i found some old issue thread in git github.com/Azure/azure-functions-durable-extension/issues/1377 but it wont understand if they suggested any solution for it – Dark S Commented 2 days ago
Add a comment  | 

1 Answer 1

Reset to default 2

Do not do this:

    try
    {
        if (!context.IsReplaying)
        {
            await GenerateScheduledReports(context, log);
        }
    }

You should just call the method:

    try
    {
        await GenerateScheduledReports(context, log);
    }

An important part of Durable Functions is that the orchestrator must be deterministic. This means it must do the same CallActivityAsync calls on each run when replaying. Your GenerateScheduledReports method looks ok. But if you don't want duplicate log messages due to replay, use the CreateReplaySafeLogger method on the context to wrap your logger so that it only logs each message when we are not replaying.

Note that it won't run the activity twice. Durable Functions skips the activity call and just resolves the Task with the result from the activity on replay.

The reason your code does not work is that after scheduling the activities, the orchestrator exits. Then once the activities finish, it replays the orchestrator and goes forward. But because you have an if block around the method call, it will never go there on replays.

本文标签: cAzure durable function stuckStack Overflow