admin管理员组

文章数量:1320773

Despite being registered as a singleton, it is instantiated multiple times instead of maintaining a single instance across the application. Could this be related to how the service is resolved or configured, particularly when interacting with the hosted service or a connection builder?

I'm building a framework in .NET Core 8 to register consumers for Azure Service Bus (in a similar way Masstransit works), but I'm encountering an issue where my ServiceBusManager singleton is not maintaining the processor registrations.

Github repo:

  1. I first register the consumers in the ServiceBusManager.AddConsumers and add them into the _processors list.

  2. Then I run a hosted service that should have the same instance of the ServiceManager and should run the method ServiceManager.StartProcessingAsync, but the _processors list is empty

Code:

builder.Services.AddSingleton<IServiceBusManager, ServiceBusManager>();
builder.Services.AddConsumerServiceBusConnection(x =>
    {
        var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
        var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
         x.AddConsumer<ShipmentCreatedConsumer>(topicName, subscriptionName);
         x.AddConsumer<TestConsumer>("testtopic", "testsuscription");
    });

Extension method:

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddConsumerServiceBusConnection(
        this IServiceCollection services,
        Action<ServiceBusConnectionBuilder> configure)
    {
        // Build the service provider to resolve the singleton instance
        using (var serviceProvider = services.BuildServiceProvider())
        {
            var serviceBusManager = serviceProvider.GetRequiredService<IServiceBusManager>();
            var builder = new ServiceBusConnectionBuilder(serviceBusManager);
            configure(builder);
        }
        return services;
    }
}

ServiceBusConnectionBuilder:

public class ServiceBusConnectionBuilder
{
    private readonly IServiceBusManager _serviceBusManager;

    public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
    {
        _serviceBusManager = serviceBusManager;
    }

    public ServiceBusConnectionBuilder AddConsumer<TConsumer>(string topicName, string suscrptionName) 
        where TConsumer : IServiceBusConsumer
    {
        _serviceBusManager.AddConsumer<TConsumer>(topicName, suscrptionName);
        return this;
    }
}

ServiceBusManager implementation:

public class ServiceBusManager : IServiceBusManager
{
    private readonly ServiceBusClient _serviceBusClient;
    private readonly IServiceProvider _serviceProvider;
    private readonly ConcurrentDictionary<string, ServiceBusProcessor> _processors;

    public ServiceBusManager(ServiceBusClient serviceBusClient,
        IServiceProvider serviceProvider,
        IOptions<ServiceBusSettings> serviceBusSettings)
    {
        _serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
        _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
        _processors = new ConcurrentDictionary<string, ServiceBusProcessor>();
    }

    public void AddConsumer<TConsumer>(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
    {
        if (string.IsNullOrWhiteSpace(topicName))
            throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));

        if (string.IsNullOrWhiteSpace(subscriptionName))
            throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));

        var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
        {
            AutoCompleteMessages = false,
            MaxConcurrentCalls = 1,       
            PrefetchCount = 10            
        });

        processor.ProcessMessageAsync += async args =>
        {
            using var scope = _serviceProvider.CreateAsyncScope();
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            await consumer.ProcessMessage(args);
        };

        processor.ProcessErrorAsync += args =>
        {
            using var scope = _serviceProvider.CreateAsyncScope();
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            return consumer.ProcessError(args);
        };

        if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
        {
            throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
        }
    }

    public async Task StartProcessingAsync(CancellationToken cancellationToken)
    {
        var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
        await Task.WhenAll(startTasks);
    }

    public async Task StopProcessingAsync()
    {
        var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
        await Task.WhenAll(stopTasks);
    }
}

Consumer interface:

public interface IServiceBusConsumer
{
    Task ProcessMessage(ProcessMessageEventArgs args);
    Task ProcessError(ProcessErrorEventArgs args);
}

Finally I have my hosted service

public class ServiceBusHostedService : CronJobServiceBase
{
    private readonly IServiceProvider _serviceProvider;
    private AsyncServiceScope _scope;
    private IHostedServiceTask _taskService;

    public ServiceBusHostedService(
        IOptions<ServiceBusHostedServiceSettings> hostedServiceSettings
        , ILogger<CronJobServiceBase> log,
        IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
    {
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
    {
        AppInsights.TrackTrace("Starting EventHubHostedService");
        _scope = _serviceProvider.CreateAsyncScope();
        _taskService = _scope.ServiceProvider.GetRequiredService<IEventServiceBusServiceTask>();
        await _taskService.StartAsync(cancellationToken);
    }

    protected override async Task DisposeScope()
    {
        await _taskService.StopAsync(CancellationToken.None);
        await _scope.DisposeAsync();
    }
}

public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
    private readonly IServiceBusManager _serviceBusManager;

    public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
    {
        _serviceBusManager = serviceBusManager;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _serviceBusManager.StartProcessingAsync(cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _serviceBusManager.StopProcessingAsync();
    }
}

Despite being registered as a singleton, it is instantiated multiple times instead of maintaining a single instance across the application. Could this be related to how the service is resolved or configured, particularly when interacting with the hosted service or a connection builder?

I'm building a framework in .NET Core 8 to register consumers for Azure Service Bus (in a similar way Masstransit works), but I'm encountering an issue where my ServiceBusManager singleton is not maintaining the processor registrations.

Github repo: https://github/matvi/servicebusframework

  1. I first register the consumers in the ServiceBusManager.AddConsumers and add them into the _processors list.

  2. Then I run a hosted service that should have the same instance of the ServiceManager and should run the method ServiceManager.StartProcessingAsync, but the _processors list is empty

Code:

builder.Services.AddSingleton<IServiceBusManager, ServiceBusManager>();
builder.Services.AddConsumerServiceBusConnection(x =>
    {
        var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
        var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
         x.AddConsumer<ShipmentCreatedConsumer>(topicName, subscriptionName);
         x.AddConsumer<TestConsumer>("testtopic", "testsuscription");
    });

Extension method:

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddConsumerServiceBusConnection(
        this IServiceCollection services,
        Action<ServiceBusConnectionBuilder> configure)
    {
        // Build the service provider to resolve the singleton instance
        using (var serviceProvider = services.BuildServiceProvider())
        {
            var serviceBusManager = serviceProvider.GetRequiredService<IServiceBusManager>();
            var builder = new ServiceBusConnectionBuilder(serviceBusManager);
            configure(builder);
        }
        return services;
    }
}

ServiceBusConnectionBuilder:

public class ServiceBusConnectionBuilder
{
    private readonly IServiceBusManager _serviceBusManager;

    public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
    {
        _serviceBusManager = serviceBusManager;
    }

    public ServiceBusConnectionBuilder AddConsumer<TConsumer>(string topicName, string suscrptionName) 
        where TConsumer : IServiceBusConsumer
    {
        _serviceBusManager.AddConsumer<TConsumer>(topicName, suscrptionName);
        return this;
    }
}

ServiceBusManager implementation:

public class ServiceBusManager : IServiceBusManager
{
    private readonly ServiceBusClient _serviceBusClient;
    private readonly IServiceProvider _serviceProvider;
    private readonly ConcurrentDictionary<string, ServiceBusProcessor> _processors;

    public ServiceBusManager(ServiceBusClient serviceBusClient,
        IServiceProvider serviceProvider,
        IOptions<ServiceBusSettings> serviceBusSettings)
    {
        _serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
        _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
        _processors = new ConcurrentDictionary<string, ServiceBusProcessor>();
    }

    public void AddConsumer<TConsumer>(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
    {
        if (string.IsNullOrWhiteSpace(topicName))
            throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));

        if (string.IsNullOrWhiteSpace(subscriptionName))
            throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));

        var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
        {
            AutoCompleteMessages = false,
            MaxConcurrentCalls = 1,       
            PrefetchCount = 10            
        });

        processor.ProcessMessageAsync += async args =>
        {
            using var scope = _serviceProvider.CreateAsyncScope();
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            await consumer.ProcessMessage(args);
        };

        processor.ProcessErrorAsync += args =>
        {
            using var scope = _serviceProvider.CreateAsyncScope();
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            return consumer.ProcessError(args);
        };

        if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
        {
            throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
        }
    }

    public async Task StartProcessingAsync(CancellationToken cancellationToken)
    {
        var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
        await Task.WhenAll(startTasks);
    }

    public async Task StopProcessingAsync()
    {
        var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
        await Task.WhenAll(stopTasks);
    }
}

Consumer interface:

public interface IServiceBusConsumer
{
    Task ProcessMessage(ProcessMessageEventArgs args);
    Task ProcessError(ProcessErrorEventArgs args);
}

Finally I have my hosted service

public class ServiceBusHostedService : CronJobServiceBase
{
    private readonly IServiceProvider _serviceProvider;
    private AsyncServiceScope _scope;
    private IHostedServiceTask _taskService;

    public ServiceBusHostedService(
        IOptions<ServiceBusHostedServiceSettings> hostedServiceSettings
        , ILogger<CronJobServiceBase> log,
        IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
    {
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
    {
        AppInsights.TrackTrace("Starting EventHubHostedService");
        _scope = _serviceProvider.CreateAsyncScope();
        _taskService = _scope.ServiceProvider.GetRequiredService<IEventServiceBusServiceTask>();
        await _taskService.StartAsync(cancellationToken);
    }

    protected override async Task DisposeScope()
    {
        await _taskService.StopAsync(CancellationToken.None);
        await _scope.DisposeAsync();
    }
}

public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
    private readonly IServiceBusManager _serviceBusManager;

    public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
    {
        _serviceBusManager = serviceBusManager;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _serviceBusManager.StartProcessingAsync(cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _serviceBusManager.StopProcessingAsync();
    }
}
Share Improve this question edited Jan 18 at 7:18 marc_s 756k184 gold badges1.4k silver badges1.5k bronze badges asked Jan 17 at 22:26 MatviMatvi 1453 silver badges12 bronze badges 3
  • I created a github repository in case you want to clone it and review github/matvi/servicebusframework – Matvi Commented Jan 18 at 0:31
  • 3 Why? - because you are building service provider multiple times (via var serviceProvider = services.BuildServiceProvider() + when the host is build), that is exactly one of the main reasons why you should not do it. – Guru Stron Commented Jan 18 at 6:37
  • 1 @GuruStron can you provide an answer to fix it? – Matvi Commented Jan 19 at 7:06
Add a comment  | 

1 Answer 1

Reset to default 1

I fixed it by removing the responsibility of creating my IServiceBusManager from the IoC.

To check full implementation check the branch feature/ManualSingleton in my github repo https://github/matvi/servicebusframework

    public class ServiceBusManager : IServiceBusManager
    {
        private static readonly Lazy<ServiceBusManager> _instance = new Lazy<ServiceBusManager>(() => new ServiceBusManager());
        private readonly ConcurrentDictionary<string, ServiceBusProcessor> _processors;

        public ServiceBusManager()
        {
            _processors = new ConcurrentDictionary<string, ServiceBusProcessor>();
        }

        public static ServiceBusManager Instance => _instance.Value;

    }

Added the IConfiguration to the AddConsumerServiceBusConnection

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddConsumerServiceBusConnection(
        this IServiceCollection services,
        IConfiguration configuration,
        Action<ServiceBusConnectionBuilder> configure)
    {
        var serviceProvider = services.BuildServiceProvider();
        var builder = new ServiceBusConnectionBuilder(ServiceBusManager.Instance, serviceProvider, configuration);
        configure(builder);
        return services;
    }
}

Use the singleton IServiceBusManager instead of the Ioc

public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
    private readonly IServiceBusManager _serviceBusManager;

    public EventServiceBusServiceTask()
    {
        _serviceBusManager = ServiceBusManager.Instance;
    }
}

Finally change the ServiceBusConnectionBuilder

public class ServiceBusConnectionBuilder
{
       private readonly IServiceBusManager _serviceBusManager;
       private readonly IServiceProvider _serviceProvider;
       private readonly IConfiguration _configuration;

       public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager,IServiceProvider serviceProvider, IConfiguration configuration)
    {
           _serviceBusManager = serviceBusManager;
           _serviceProvider = serviceProvider;
           _configuration = configuration;
       }

    public ServiceBusConnectionBuilder AddConsumer<TConsumer>(string topicName, string suscrptionName) 
        where TConsumer : IServiceBusConsumer
    {
           var connectinStringServiceBus = _configuration["ServiceBusSettings:ConnectionString"];
           var serviceBusClient = new ServiceBusClient(connectinStringServiceBus);
           _serviceBusManager.AddConsumer<TConsumer>(topicName, suscrptionName, serviceBusClient, _serviceProvider);
        return this;
    }
}

本文标签: