admin管理员组文章数量:1242886
I have a problem with Masstransit and the Saga feature.
I cannot explain the following errors:
Error: MassTransit.ReceiveTransport[0]
R-FAULT rabbitmq://localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 Shared.MessagingContracts.JobSubmitted MainApp.MonitoringJobState(00:00:00.0334249)
System.NotSupportedException: Send saga is no longer available in IoC
at MassTransit.Configuration.RegistrationServiceCollectionExtensions.TempSagaRepository`1.Send[T](ConsumeContext`1 context, ISagaPolicy`2 policy, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/Configuration/RegistrationServiceCollectionExtensions.cs:Line 93
at MassTransit.Middleware.CorrelatedSagaFilter`2.Send(ConsumeContext`1 context, IPipe`1 next)
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
.InMemoryRepository();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});
cfg.UseDelayedMessageScheduler();
cfg.ReceiveEndpoint(sagaQueue, e =>
{
e.StateMachineSaga(
context.GetRequiredService<MonitoringJobStateMachine>(),
context.GetRequiredService<ISagaRepository<MonitoringJobState>>());
});
});
});
MonitoringJobStateMachine-Class:
using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;
namespace MainApp
{
public class MonitoringJobStateMachine : MassTransitStateMachine<MonitoringJobState>
{
public State Submitted { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event<JobSubmitted> JobSubmittedEvent { get; private set; } = default!;
public Event<JobCompleted> JobCompletedEvent { get; private set; } = default!;
public Event<JobFailed> JobFailedEvent { get; private set; } = default!;
public Schedule<MonitoringJobState, JobTimeout> JobTimeoutSchedule { get; private set; } = default!;
public MonitoringJobStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => JobSubmittedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
x.InsertOnInitial = true;
});
Event(() => JobCompletedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
Event(() => JobFailedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Konfiguriere den Timeout-Scheduler
Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(10);
s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
Initially(
When(JobSubmittedEvent)
.ThenAsync(async context =>
{
// Werte aus der eingehenden Nachricht übernehmen
context.Saga.SubmittedAt = context.Message.Timestamp;
context.Saga.Regions = context.Message.Regions;
context.Saga.CurrentAttempt = 0;
if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
context.Saga.CurrentRegion = context.Saga.Regions[0];
Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");
// Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
await context.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = context.Saga.CorrelationId,
Region = context.Saga.CurrentRegion,
Attempt = context.Saga.CurrentAttempt
}, publishContext =>
{
// Setzt explizit die Zieladresse für die Nachricht:
publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
});
})
// Plane den Timeout direkt in der Kette
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.TransitionTo(Processing)
);
During(Processing,
When(JobCompletedEvent)
.Then(ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
})
.Unschedule(JobTimeoutSchedule)
.TransitionTo(Completed),
When(JobFailedEvent)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
// Erhöhe den Versuchszähler
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
// Setze die neue Region
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
// Sende den neuen ProcessJob-Befehl an die neue regionale Queue
await ctx.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
// Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
binder => binder.TransitionTo(Processing),
binder => binder.TransitionTo(Failed)
),
When(JobTimeoutSchedule.Received)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
// Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
await ctx.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
);
}
}
}
I have a problem with Masstransit and the Saga feature.
I cannot explain the following errors:
Error: MassTransit.ReceiveTransport[0]
R-FAULT rabbitmq://localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 Shared.MessagingContracts.JobSubmitted MainApp.MonitoringJobState(00:00:00.0334249)
System.NotSupportedException: Send saga is no longer available in IoC
at MassTransit.Configuration.RegistrationServiceCollectionExtensions.TempSagaRepository`1.Send[T](ConsumeContext`1 context, ISagaPolicy`2 policy, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/Configuration/RegistrationServiceCollectionExtensions.cs:Line 93
at MassTransit.Middleware.CorrelatedSagaFilter`2.Send(ConsumeContext`1 context, IPipe`1 next)
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
.InMemoryRepository();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});
cfg.UseDelayedMessageScheduler();
cfg.ReceiveEndpoint(sagaQueue, e =>
{
e.StateMachineSaga(
context.GetRequiredService<MonitoringJobStateMachine>(),
context.GetRequiredService<ISagaRepository<MonitoringJobState>>());
});
});
});
MonitoringJobStateMachine-Class:
using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;
namespace MainApp
{
public class MonitoringJobStateMachine : MassTransitStateMachine<MonitoringJobState>
{
public State Submitted { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event<JobSubmitted> JobSubmittedEvent { get; private set; } = default!;
public Event<JobCompleted> JobCompletedEvent { get; private set; } = default!;
public Event<JobFailed> JobFailedEvent { get; private set; } = default!;
public Schedule<MonitoringJobState, JobTimeout> JobTimeoutSchedule { get; private set; } = default!;
public MonitoringJobStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => JobSubmittedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
x.InsertOnInitial = true;
});
Event(() => JobCompletedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
Event(() => JobFailedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Konfiguriere den Timeout-Scheduler
Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(10);
s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
Initially(
When(JobSubmittedEvent)
.ThenAsync(async context =>
{
// Werte aus der eingehenden Nachricht übernehmen
context.Saga.SubmittedAt = context.Message.Timestamp;
context.Saga.Regions = context.Message.Regions;
context.Saga.CurrentAttempt = 0;
if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
context.Saga.CurrentRegion = context.Saga.Regions[0];
Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");
// Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
await context.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = context.Saga.CorrelationId,
Region = context.Saga.CurrentRegion,
Attempt = context.Saga.CurrentAttempt
}, publishContext =>
{
// Setzt explizit die Zieladresse für die Nachricht:
publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
});
})
// Plane den Timeout direkt in der Kette
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.TransitionTo(Processing)
);
During(Processing,
When(JobCompletedEvent)
.Then(ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
})
.Unschedule(JobTimeoutSchedule)
.TransitionTo(Completed),
When(JobFailedEvent)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
// Erhöhe den Versuchszähler
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
// Setze die neue Region
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
// Sende den neuen ProcessJob-Befehl an die neue regionale Queue
await ctx.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
// Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
binder => binder.TransitionTo(Processing),
binder => binder.TransitionTo(Failed)
),
When(JobTimeoutSchedule.Received)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
// Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
await ctx.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
);
}
}
}
Share
Improve this question
edited 2 days ago
Mark Rotteveel
109k227 gold badges156 silver badges220 bronze badges
asked 2 days ago
LUCKYONELUCKYONE
1
1 Answer
Reset to default 0Solution:
I had an error in the registration, which is why the error occurred.
Code:
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
.RedisRepository(r =>
{
r.DatabaseConfiguration("localhost:6379"); // Redis-Server-Adresse
r.KeyPrefix = "saga:"; // Optional: Präfix für Redis-Schlüssel
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});
cfg.UseDelayedMessageScheduler();
cfg.UseMessageScheduler(new Uri("queue:scheduler"));
cfg.ConfigureEndpoints(context);
});
});
本文标签: cSend saga is not available in IoC anymoreStack Overflow
版权声明:本文标题:c# - Send saga is not available in IoC anymore - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1740097448a2224293.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论