admin管理员组

文章数量:1241106

The what

When creating a MassTransit Consumer in an AWS Lambda function. On an exception, MassTransit does not seem to put the message on the _error queue as described in the MassTransit Exceptions Docs

The setup and code

Here is a snippet of the code, which is based on the MassTransit Example in Github

The Lambda Function:

public Function()
    {
        _environment = Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT") ?? "production";

        var services = LambdaConfiguration.ConfigureServices(_cache);
        
        _provider = services.BuildServiceProvider(true);
    }

    public async Task FunctionHandler(SQSEvent input, ILambdaContext context)
    {
        using var cts = new CancellationTokenSource(context.RemainingTime);
        using var scope = _provider.CreateScope();
        
        var factory = scope.ServiceProvider.GetRequiredService<IReceiveEndpointDispatcherFactory>();

        var ep = factory.CreateConsumerReceiver<HandleVehicleTelemetryConsumer>("static-name");
        
        var logger = scope.ServiceProvider.GetRequiredService<ILogger<Function>>();
        
        logger.LogInformation("Lambda function handler executed {@Environment}", _environment);
        
        var headers = new Dictionary<string, object>();
        try
        {
            foreach (var record in input.Records)
            {
                foreach (var key in record!.Attributes!.Keys)
                    headers[key] = record.Attributes[key];
        
                foreach (var key in record!.MessageAttributes!.Keys)
                    headers[key] = record.MessageAttributes[key];
        
                logger.LogInformation("Message received {Body}", record.Body);

                var body = Encoding.UTF8.GetBytes(record.Body);
        
                await ep.Dispatch(body, headers, cts.Token);
            }
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Error while processing message");
            throw;
        }
        
        finally
        {
            await Log.CloseAndFlushAsync();
        }
    }

The Consumer where I am manually forcing an exception.

public class HandleVehicleTelemetryConsumer : IConsumer<Messaging.Contracts.HandleVehicleTelemetry>
{
    private readonly ILogger<HandleVehicleTelemetryConsumer> _logger;

    public HandleVehicleTelemetryConsumer(ILogger<HandleVehicleTelemetryConsumer> logger)
     {
         _logger = logger;
     }

    public Task Consume(ConsumeContext<Messaging.Contracts.HandleVehicleTelemetry> context)
    {
        _logger.LogInformation("Message received {@MessageId}", context.MessageId);

        throw new Exception("Something bad happened"); //<----- FORCING EXCEPTION HERE
        
        return Task.CompletedTask;
    }
}

So for an exception, I would have expected a new _error created like so:

And I do see my Exception in the error logs:

Another note, is that to get this Consumer to receive messages, I had to manually add the SQS trigger in the AWS console (not sure if that makes a difference)

I have tried setting this up as a .NET console app, and all is working as expected with MassTransit picking up the correct queue and putting messages in the _error queue for any exceptions without having to explicitly creating the queues.

But for a AWS Lambda Setup, it doesn't seem to be working? Am I missing something?

本文标签: cMassTransit in AWS Lambda SQS does not put message into error queue on ExceptionStack Overflow