admin管理员组

文章数量:1242886

I have a problem with Masstransit and the Saga feature.

I cannot explain the following errors:

Error: MassTransit.ReceiveTransport[0]
      R-FAULT rabbitmq://localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 Shared.MessagingContracts.JobSubmitted MainApp.MonitoringJobState(00:00:00.0334249)
      System.NotSupportedException: Send saga is no longer available in IoC
         at MassTransit.Configuration.RegistrationServiceCollectionExtensions.TempSagaRepository`1.Send[T](ConsumeContext`1 context, ISagaPolicy`2 policy, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/Configuration/RegistrationServiceCollectionExtensions.cs:Line 93
         at MassTransit.Middleware.CorrelatedSagaFilter`2.Send(ConsumeContext`1 context, IPipe`1 next)
services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
        .InMemoryRepository();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUser);
            h.Password(rabbitPass);
        });

        cfg.UseDelayedMessageScheduler();

        cfg.ReceiveEndpoint(sagaQueue, e =>
        {
            e.StateMachineSaga(
                context.GetRequiredService<MonitoringJobStateMachine>(),
                context.GetRequiredService<ISagaRepository<MonitoringJobState>>());
        });
    });
});

MonitoringJobStateMachine-Class:

using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;

namespace MainApp
{
    public class MonitoringJobStateMachine : MassTransitStateMachine<MonitoringJobState>
    {
        public State Submitted { get; private set; } = null!;
        public State Processing { get; private set; } = null!;
        public State Completed { get; private set; } = null!;
        public State Failed { get; private set; } = null!;

        public Event<JobSubmitted> JobSubmittedEvent { get; private set; } = default!;
        public Event<JobCompleted> JobCompletedEvent { get; private set; } = default!;
        public Event<JobFailed> JobFailedEvent { get; private set; } = default!;

        public Schedule<MonitoringJobState, JobTimeout> JobTimeoutSchedule { get; private set; } = default!;

        public MonitoringJobStateMachine()
        {
            InstanceState(x => x.CurrentState);


            Event(() => JobSubmittedEvent, x =>
            {
                x.CorrelateById(ctx => ctx.Message.CorrelationId);
                x.InsertOnInitial = true;
            });

            Event(() => JobCompletedEvent, x =>
            {
                x.CorrelateById(ctx => ctx.Message.CorrelationId);
            });

            Event(() => JobFailedEvent, x =>
            {
                x.CorrelateById(ctx => ctx.Message.CorrelationId);
            });

            // Konfiguriere den Timeout-Scheduler
            Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
            {
                s.Delay = TimeSpan.FromSeconds(10);
                s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
            });

            // Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
            Initially(
                When(JobSubmittedEvent)
                    .ThenAsync(async context =>
                    {
                        // Werte aus der eingehenden Nachricht übernehmen
                        context.Saga.SubmittedAt = context.Message.Timestamp;
                        context.Saga.Regions = context.Message.Regions;
                        context.Saga.CurrentAttempt = 0;
                        if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
                            context.Saga.CurrentRegion = context.Saga.Regions[0];

                        Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");

                        // Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
                        await context.Publish<ProcessJob>(new ProcessJobCommand
                        {
                            CorrelationId = context.Saga.CorrelationId,
                            Region = context.Saga.CurrentRegion,
                            Attempt = context.Saga.CurrentAttempt
                        }, publishContext =>
                        {
                            // Setzt explizit die Zieladresse für die Nachricht:
                            publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
                        });
                    })
                    // Plane den Timeout direkt in der Kette
                    .Schedule(JobTimeoutSchedule,
                        ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
                        ctx => TimeSpan.FromSeconds(10))
                    .TransitionTo(Processing)
            );

            During(Processing,
                When(JobCompletedEvent)
                    .Then(ctx =>
                    {
                        Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
                    })
                    .Unschedule(JobTimeoutSchedule)
                    .TransitionTo(Completed),

                When(JobFailedEvent)
                    .ThenAsync(async ctx =>
                    {
                        Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
                        // Erhöhe den Versuchszähler
                        ctx.Saga.CurrentAttempt++;
                        if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
                        {
                            // Setze die neue Region
                            ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
                            Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");

                            // Sende den neuen ProcessJob-Befehl an die neue regionale Queue
                            await ctx.Publish<ProcessJob>(new ProcessJobCommand
                            {
                                CorrelationId = ctx.Saga.CorrelationId,
                                Region = ctx.Saga.CurrentRegion,
                                Attempt = ctx.Saga.CurrentAttempt
                            }, publishContext =>
                            {
                                publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
                            });
                        }
                    })
                    // Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
                    .Schedule(JobTimeoutSchedule,
                        ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
                        ctx => TimeSpan.FromSeconds(10))
                    .IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
                        binder => binder.TransitionTo(Processing),
                        binder => binder.TransitionTo(Failed)
                    ),

                When(JobTimeoutSchedule.Received)
                    .ThenAsync(async ctx =>
                    {
                        Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
                        // Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
                        ctx.Saga.CurrentAttempt++;
                        if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
                        {
                            ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
                            Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
                            await ctx.Publish<ProcessJob>(new ProcessJobCommand
                            {
                                CorrelationId = ctx.Saga.CorrelationId,
                                Region = ctx.Saga.CurrentRegion,
                                Attempt = ctx.Saga.CurrentAttempt
                            }, publishContext =>
                            {
                                publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
                            });
                        }
                    })
                    .Schedule(JobTimeoutSchedule,
                        ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
                        ctx => TimeSpan.FromSeconds(10))
            );
        }
    }
}

I have a problem with Masstransit and the Saga feature.

I cannot explain the following errors:

Error: MassTransit.ReceiveTransport[0]
      R-FAULT rabbitmq://localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 Shared.MessagingContracts.JobSubmitted MainApp.MonitoringJobState(00:00:00.0334249)
      System.NotSupportedException: Send saga is no longer available in IoC
         at MassTransit.Configuration.RegistrationServiceCollectionExtensions.TempSagaRepository`1.Send[T](ConsumeContext`1 context, ISagaPolicy`2 policy, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/Configuration/RegistrationServiceCollectionExtensions.cs:Line 93
         at MassTransit.Middleware.CorrelatedSagaFilter`2.Send(ConsumeContext`1 context, IPipe`1 next)
services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
        .InMemoryRepository();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUser);
            h.Password(rabbitPass);
        });

        cfg.UseDelayedMessageScheduler();

        cfg.ReceiveEndpoint(sagaQueue, e =>
        {
            e.StateMachineSaga(
                context.GetRequiredService<MonitoringJobStateMachine>(),
                context.GetRequiredService<ISagaRepository<MonitoringJobState>>());
        });
    });
});

MonitoringJobStateMachine-Class:

using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;

namespace MainApp
{
    public class MonitoringJobStateMachine : MassTransitStateMachine<MonitoringJobState>
    {
        public State Submitted { get; private set; } = null!;
        public State Processing { get; private set; } = null!;
        public State Completed { get; private set; } = null!;
        public State Failed { get; private set; } = null!;

        public Event<JobSubmitted> JobSubmittedEvent { get; private set; } = default!;
        public Event<JobCompleted> JobCompletedEvent { get; private set; } = default!;
        public Event<JobFailed> JobFailedEvent { get; private set; } = default!;

        public Schedule<MonitoringJobState, JobTimeout> JobTimeoutSchedule { get; private set; } = default!;

        public MonitoringJobStateMachine()
        {
            InstanceState(x => x.CurrentState);


            Event(() => JobSubmittedEvent, x =>
            {
                x.CorrelateById(ctx => ctx.Message.CorrelationId);
                x.InsertOnInitial = true;
            });

            Event(() => JobCompletedEvent, x =>
            {
                x.CorrelateById(ctx => ctx.Message.CorrelationId);
            });

            Event(() => JobFailedEvent, x =>
            {
                x.CorrelateById(ctx => ctx.Message.CorrelationId);
            });

            // Konfiguriere den Timeout-Scheduler
            Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
            {
                s.Delay = TimeSpan.FromSeconds(10);
                s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
            });

            // Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
            Initially(
                When(JobSubmittedEvent)
                    .ThenAsync(async context =>
                    {
                        // Werte aus der eingehenden Nachricht übernehmen
                        context.Saga.SubmittedAt = context.Message.Timestamp;
                        context.Saga.Regions = context.Message.Regions;
                        context.Saga.CurrentAttempt = 0;
                        if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
                            context.Saga.CurrentRegion = context.Saga.Regions[0];

                        Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");

                        // Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
                        await context.Publish<ProcessJob>(new ProcessJobCommand
                        {
                            CorrelationId = context.Saga.CorrelationId,
                            Region = context.Saga.CurrentRegion,
                            Attempt = context.Saga.CurrentAttempt
                        }, publishContext =>
                        {
                            // Setzt explizit die Zieladresse für die Nachricht:
                            publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
                        });
                    })
                    // Plane den Timeout direkt in der Kette
                    .Schedule(JobTimeoutSchedule,
                        ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
                        ctx => TimeSpan.FromSeconds(10))
                    .TransitionTo(Processing)
            );

            During(Processing,
                When(JobCompletedEvent)
                    .Then(ctx =>
                    {
                        Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
                    })
                    .Unschedule(JobTimeoutSchedule)
                    .TransitionTo(Completed),

                When(JobFailedEvent)
                    .ThenAsync(async ctx =>
                    {
                        Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
                        // Erhöhe den Versuchszähler
                        ctx.Saga.CurrentAttempt++;
                        if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
                        {
                            // Setze die neue Region
                            ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
                            Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");

                            // Sende den neuen ProcessJob-Befehl an die neue regionale Queue
                            await ctx.Publish<ProcessJob>(new ProcessJobCommand
                            {
                                CorrelationId = ctx.Saga.CorrelationId,
                                Region = ctx.Saga.CurrentRegion,
                                Attempt = ctx.Saga.CurrentAttempt
                            }, publishContext =>
                            {
                                publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
                            });
                        }
                    })
                    // Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
                    .Schedule(JobTimeoutSchedule,
                        ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
                        ctx => TimeSpan.FromSeconds(10))
                    .IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
                        binder => binder.TransitionTo(Processing),
                        binder => binder.TransitionTo(Failed)
                    ),

                When(JobTimeoutSchedule.Received)
                    .ThenAsync(async ctx =>
                    {
                        Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
                        // Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
                        ctx.Saga.CurrentAttempt++;
                        if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
                        {
                            ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
                            Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
                            await ctx.Publish<ProcessJob>(new ProcessJobCommand
                            {
                                CorrelationId = ctx.Saga.CorrelationId,
                                Region = ctx.Saga.CurrentRegion,
                                Attempt = ctx.Saga.CurrentAttempt
                            }, publishContext =>
                            {
                                publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
                            });
                        }
                    })
                    .Schedule(JobTimeoutSchedule,
                        ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
                        ctx => TimeSpan.FromSeconds(10))
            );
        }
    }
}
Share Improve this question edited 2 days ago Mark Rotteveel 109k227 gold badges156 silver badges220 bronze badges asked 2 days ago LUCKYONELUCKYONE 1
Add a comment  | 

1 Answer 1

Reset to default 0

Solution:

I had an error in the registration, which is why the error occurred.

Code:

        services.AddMassTransit(x =>
        {
            x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
                    .RedisRepository(r =>
                    {
                        r.DatabaseConfiguration("localhost:6379"); // Redis-Server-Adresse
                        r.KeyPrefix = "saga:"; // Optional: Präfix für Redis-Schlüssel
                    });

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(new Uri(rabbitHost), h =>
                {
                    h.Username(rabbitUser);
                    h.Password(rabbitPass);
                });

                cfg.UseDelayedMessageScheduler();
                cfg.UseMessageScheduler(new Uri("queue:scheduler"));

                cfg.ConfigureEndpoints(context);
            });
        });

本文标签: cSend saga is not available in IoC anymoreStack Overflow