admin管理员组文章数量:1320773
Despite being registered as a singleton, it is instantiated multiple times instead of maintaining a single instance across the application. Could this be related to how the service is resolved or configured, particularly when interacting with the hosted service or a connection builder?
I'm building a framework in .NET Core 8 to register consumers for Azure Service Bus (in a similar way Masstransit works), but I'm encountering an issue where my ServiceBusManager
singleton is not maintaining the processor registrations.
Github repo:
I first register the consumers in the
ServiceBusManager.AddConsumers
and add them into the_processors
list.Then I run a hosted service that should have the same instance of the
ServiceManager
and should run the methodServiceManager.StartProcessingAsync
, but the_processors
list is empty
Code:
builder.Services.AddSingleton<IServiceBusManager, ServiceBusManager>();
builder.Services.AddConsumerServiceBusConnection(x =>
{
var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
x.AddConsumer<ShipmentCreatedConsumer>(topicName, subscriptionName);
x.AddConsumer<TestConsumer>("testtopic", "testsuscription");
});
Extension method:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
Action<ServiceBusConnectionBuilder> configure)
{
// Build the service provider to resolve the singleton instance
using (var serviceProvider = services.BuildServiceProvider())
{
var serviceBusManager = serviceProvider.GetRequiredService<IServiceBusManager>();
var builder = new ServiceBusConnectionBuilder(serviceBusManager);
configure(builder);
}
return services;
}
}
ServiceBusConnectionBuilder
:
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public ServiceBusConnectionBuilder AddConsumer<TConsumer>(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
_serviceBusManager.AddConsumer<TConsumer>(topicName, suscrptionName);
return this;
}
}
ServiceBusManager
implementation:
public class ServiceBusManager : IServiceBusManager
{
private readonly ServiceBusClient _serviceBusClient;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary<string, ServiceBusProcessor> _processors;
public ServiceBusManager(ServiceBusClient serviceBusClient,
IServiceProvider serviceProvider,
IOptions<ServiceBusSettings> serviceBusSettings)
{
_serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_processors = new ConcurrentDictionary<string, ServiceBusProcessor>();
}
public void AddConsumer<TConsumer>(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
{
if (string.IsNullOrWhiteSpace(topicName))
throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));
if (string.IsNullOrWhiteSpace(subscriptionName))
throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));
var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 10
});
processor.ProcessMessageAsync += async args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ProcessMessage(args);
};
processor.ProcessErrorAsync += args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
return consumer.ProcessError(args);
};
if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
{
throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
}
}
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
await Task.WhenAll(startTasks);
}
public async Task StopProcessingAsync()
{
var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
await Task.WhenAll(stopTasks);
}
}
Consumer interface:
public interface IServiceBusConsumer
{
Task ProcessMessage(ProcessMessageEventArgs args);
Task ProcessError(ProcessErrorEventArgs args);
}
Finally I have my hosted service
public class ServiceBusHostedService : CronJobServiceBase
{
private readonly IServiceProvider _serviceProvider;
private AsyncServiceScope _scope;
private IHostedServiceTask _taskService;
public ServiceBusHostedService(
IOptions<ServiceBusHostedServiceSettings> hostedServiceSettings
, ILogger<CronJobServiceBase> log,
IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
AppInsights.TrackTrace("Starting EventHubHostedService");
_scope = _serviceProvider.CreateAsyncScope();
_taskService = _scope.ServiceProvider.GetRequiredService<IEventServiceBusServiceTask>();
await _taskService.StartAsync(cancellationToken);
}
protected override async Task DisposeScope()
{
await _taskService.StopAsync(CancellationToken.None);
await _scope.DisposeAsync();
}
}
public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;
public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StartProcessingAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StopProcessingAsync();
}
}
Despite being registered as a singleton, it is instantiated multiple times instead of maintaining a single instance across the application. Could this be related to how the service is resolved or configured, particularly when interacting with the hosted service or a connection builder?
I'm building a framework in .NET Core 8 to register consumers for Azure Service Bus (in a similar way Masstransit works), but I'm encountering an issue where my ServiceBusManager
singleton is not maintaining the processor registrations.
Github repo: https://github/matvi/servicebusframework
I first register the consumers in the
ServiceBusManager.AddConsumers
and add them into the_processors
list.Then I run a hosted service that should have the same instance of the
ServiceManager
and should run the methodServiceManager.StartProcessingAsync
, but the_processors
list is empty
Code:
builder.Services.AddSingleton<IServiceBusManager, ServiceBusManager>();
builder.Services.AddConsumerServiceBusConnection(x =>
{
var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
x.AddConsumer<ShipmentCreatedConsumer>(topicName, subscriptionName);
x.AddConsumer<TestConsumer>("testtopic", "testsuscription");
});
Extension method:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
Action<ServiceBusConnectionBuilder> configure)
{
// Build the service provider to resolve the singleton instance
using (var serviceProvider = services.BuildServiceProvider())
{
var serviceBusManager = serviceProvider.GetRequiredService<IServiceBusManager>();
var builder = new ServiceBusConnectionBuilder(serviceBusManager);
configure(builder);
}
return services;
}
}
ServiceBusConnectionBuilder
:
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public ServiceBusConnectionBuilder AddConsumer<TConsumer>(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
_serviceBusManager.AddConsumer<TConsumer>(topicName, suscrptionName);
return this;
}
}
ServiceBusManager
implementation:
public class ServiceBusManager : IServiceBusManager
{
private readonly ServiceBusClient _serviceBusClient;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary<string, ServiceBusProcessor> _processors;
public ServiceBusManager(ServiceBusClient serviceBusClient,
IServiceProvider serviceProvider,
IOptions<ServiceBusSettings> serviceBusSettings)
{
_serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_processors = new ConcurrentDictionary<string, ServiceBusProcessor>();
}
public void AddConsumer<TConsumer>(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
{
if (string.IsNullOrWhiteSpace(topicName))
throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));
if (string.IsNullOrWhiteSpace(subscriptionName))
throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));
var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 10
});
processor.ProcessMessageAsync += async args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ProcessMessage(args);
};
processor.ProcessErrorAsync += args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
return consumer.ProcessError(args);
};
if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
{
throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
}
}
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
await Task.WhenAll(startTasks);
}
public async Task StopProcessingAsync()
{
var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
await Task.WhenAll(stopTasks);
}
}
Consumer interface:
public interface IServiceBusConsumer
{
Task ProcessMessage(ProcessMessageEventArgs args);
Task ProcessError(ProcessErrorEventArgs args);
}
Finally I have my hosted service
public class ServiceBusHostedService : CronJobServiceBase
{
private readonly IServiceProvider _serviceProvider;
private AsyncServiceScope _scope;
private IHostedServiceTask _taskService;
public ServiceBusHostedService(
IOptions<ServiceBusHostedServiceSettings> hostedServiceSettings
, ILogger<CronJobServiceBase> log,
IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
AppInsights.TrackTrace("Starting EventHubHostedService");
_scope = _serviceProvider.CreateAsyncScope();
_taskService = _scope.ServiceProvider.GetRequiredService<IEventServiceBusServiceTask>();
await _taskService.StartAsync(cancellationToken);
}
protected override async Task DisposeScope()
{
await _taskService.StopAsync(CancellationToken.None);
await _scope.DisposeAsync();
}
}
public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;
public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StartProcessingAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StopProcessingAsync();
}
}
Share
Improve this question
edited Jan 18 at 7:18
marc_s
756k184 gold badges1.4k silver badges1.5k bronze badges
asked Jan 17 at 22:26
MatviMatvi
1453 silver badges12 bronze badges
3
|
1 Answer
Reset to default 1I fixed it by removing the responsibility of creating my IServiceBusManager from the IoC.
To check full implementation check the branch feature/ManualSingleton in my github repo https://github/matvi/servicebusframework
public class ServiceBusManager : IServiceBusManager
{
private static readonly Lazy<ServiceBusManager> _instance = new Lazy<ServiceBusManager>(() => new ServiceBusManager());
private readonly ConcurrentDictionary<string, ServiceBusProcessor> _processors;
public ServiceBusManager()
{
_processors = new ConcurrentDictionary<string, ServiceBusProcessor>();
}
public static ServiceBusManager Instance => _instance.Value;
}
Added the IConfiguration to the AddConsumerServiceBusConnection
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
IConfiguration configuration,
Action<ServiceBusConnectionBuilder> configure)
{
var serviceProvider = services.BuildServiceProvider();
var builder = new ServiceBusConnectionBuilder(ServiceBusManager.Instance, serviceProvider, configuration);
configure(builder);
return services;
}
}
Use the singleton IServiceBusManager instead of the Ioc
public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;
public EventServiceBusServiceTask()
{
_serviceBusManager = ServiceBusManager.Instance;
}
}
Finally change the ServiceBusConnectionBuilder
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
private readonly IServiceProvider _serviceProvider;
private readonly IConfiguration _configuration;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager,IServiceProvider serviceProvider, IConfiguration configuration)
{
_serviceBusManager = serviceBusManager;
_serviceProvider = serviceProvider;
_configuration = configuration;
}
public ServiceBusConnectionBuilder AddConsumer<TConsumer>(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
var connectinStringServiceBus = _configuration["ServiceBusSettings:ConnectionString"];
var serviceBusClient = new ServiceBusClient(connectinStringServiceBus);
_serviceBusManager.AddConsumer<TConsumer>(topicName, suscrptionName, serviceBusClient, _serviceProvider);
return this;
}
}
本文标签:
版权声明:本文标题:c# - Why is my singleton service being instanciated multiple times in .NET 8 when using hosted services? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742088543a2420126.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
var serviceProvider = services.BuildServiceProvider()
+ when the host is build), that is exactly one of the main reasons why you should not do it. – Guru Stron Commented Jan 18 at 6:37