admin管理员组

文章数量:1122832

i am trying implement Rabbit MQ consumer, that listens exchange 'XXX' and catches sport game event, but i have problem , some of messages from exchange are lost,and it seems that it is because of large amount of i/o operations in my class EventProcessor. How should i detect what's a problem: RabbitMQ incorrect configuration/work or i should optimize my processor class?

public class RabbitMqConsumer(IOptionsMonitor<RabbitMqSettings> options,
        IEventProcessorManager eventProcessorManager, IServiceProvider serviceProvider)
        : BackgroundService
    {
        private readonly RabbitMqSettings _settings = options.Get("RabbitMQConsumer");
        private IConnection? _connection;
        private IModel? _channel;

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {
                var factory = new ConnectionFactory()
                {
                    HostName = _settings.HostName,
                    UserName = _settings.UserName,
                    Password = _settings.Password,
                    VirtualHost = _settings.VirtualHost
                };

                _connection = factory.CreateConnection();
                _channel = _connection.CreateModel();

                _channel.QueueDeclarePassive(queue: _settings.QueueName);

                var consumer = new EventingBasicConsumer(_channel);
                consumer.Received += async (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);

                    Console.WriteLine(message);

                    var t = JsonConvert.DeserializeObject<QueueItemModel>(message);

                    if (t is not { Form: 2 })
                        return;
                    
                   await HandleShortenEventInfoAsync(t, stoppingToken);
                };

                _channel.BasicConsume(queue: _settings.QueueName,
                    autoAck: true,
                    consumer: consumer);
                
                await Task.Delay(Timeout.Infinite, stoppingToken);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Exception in RabbitMqConsumer.ExecuteAsync: {ex}");
               
            }
        }

        private async Task HandleShortenEventInfoAsync(QueueItemModel shortenEventInfo, CancellationToken stoppingToken)
        {
            try
            {
                using var scope = serviceProvider.CreateScope();
                var settingRepository = scope.ServiceProvider.GetRequiredService<ISettingRepository>();

                settingRepository.Create(new Setting
                {
                    Name = $"RMQ {DateTime.UtcNow}",
                    Value = shortenEventInfo.Data.ToString(),
                });
                await settingRepository.SaveChangesAsync(stoppingToken);
                if (shortenEventInfo.Type == "XXX")
                {
                    var eventItem =
                        JsonConvert.DeserializeObject<ShortFormMatchEventViewItem>(shortenEventInfo.Data.ToString());
                    if (eventItem != null && shortenEventInfo.DisciplineGId != null)
                    {
                        eventItem.DisciplineGlobalId = shortenEventInfo.DisciplineGId;
                        var eventProcessor = eventProcessorManager.GetOrCreateProcessor(
                            new GameKey(eventItem.DisciplineGlobalId, eventItem.GameId, eventItem.TournamentId),
                            stoppingToken);

                        await eventProcessor.ProcessEventAsync(eventItem, stoppingToken);
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }

        public override void Dispose()
        {
            if (_channel != null)
            {
                _channel.Close();
                _channel.Dispose();
            }
            if (_connection != null)
            {
                _connection.Close();
                _connection.Dispose();
            }
            base.Dispose();
        }
    }

this my consumer, i create on the fly each event processor for group of events, if the other hangfire job does not catch to create it. Here code of processor:

 public interface IEventProcessor
    {
        void SetGameKey(GameKey gameKey);
        void SetProviderHandler(IProviderHandler providerHandler);
        Task ProcessEventAsync(ShortFormMatchEventViewItem eventItem, CancellationToken cancellationToken);
      

        event EventHandler<GameKey> MatchEnded;
    }

    public sealed class EventProcessor
        : IEventProcessor
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly IHubContext<TalkActive, ITalkActiveClient> _hubContext;
        private GameKey? _gameKey;
        private IProviderHandler? _providerHandler;
        public event EventHandler<GameKey>? MatchEnded;
        private ShortFormMatchEventViewItem? _lastEventItem;
      

        public EventProcessor(IServiceProvider serviceProvider, IHubContext<TalkActive, ITalkActiveClient> hubContext)
        {
            _serviceProvider = serviceProvider;
            _hubContext = hubContext;


          
        }

        public void SetGameKey(GameKey gameKey)
        {
            _gameKey = gameKey;
        }

        public void SetProviderHandler(IProviderHandler? providerHandler)
        {
            _providerHandler = providerHandler;
        }


        public async Task ProcessEventAsync(ShortFormMatchEventViewItem eventItem, CancellationToken cancellationToken)
        {
            using var scope = _serviceProvider.CreateScope();
            var serviceProvider = scope.ServiceProvider;

            var eventRepository = serviceProvider.GetRequiredService<IEventRepository>();
            var sportFormRepository = serviceProvider.GetRequiredService<ISportFormRepository>();
            var settingRepository = serviceProvider.GetRequiredService<ISettingRepository>();

            var existingMatch = await GetEventAsync(eventRepository, settingRepository, cancellationToken);
            if (existingMatch != null)
            {
                eventItem.InnerTournamentId = existingMatch.TournamentId;
                var sportForm = await GetSportFormAsync(existingMatch.Id, sportFormRepository, cancellationToken);

                if (Enum.IsDefined(typeof(BaseMatchEvent), eventItem.Event))
                {
                    await UpdateStatusesAsync(eventItem, existingMatch, sportForm, eventRepository, sportFormRepository,
                        serviceProvider,
                        cancellationToken);
                }
              

                await SendShortGameInfoToGameListener(eventItem, settingRepository, cancellationToken);


                _lastEventItem = eventItem;
                if (sportForm is { TradingStatus: TradingStatus.PreMatch or TradingStatus.Live })
                {
                    _providerHandler ??= InitializeProviderHandler(serviceProvider);
                    var mathModelService = serviceProvider.GetRequiredService<IMathModelService>();
                    await mathModelService.ProcessMathModelForEventAsync(eventItem, existingMatch, sportForm,
                        _providerHandler, cancellationToken);
            
                }

                if (existingMatch.EventStatus == EventStatus.Finished &&
                    sportForm is { TradingStatus: TradingStatus.Finished })
                {
                    OnMatchEnded();
                }
            }
        }


        private void OnMatchEnded()
        {
            if (_gameKey != null)
            {
                MatchEnded?.Invoke(this, _gameKey);
            }
        }

        private IProviderHandler? InitializeProviderHandler(IServiceProvider serviceProvider)
        {
            if (_gameKey == null)
            {
                Console.WriteLine("GameKey is not set.");
                return null;
            }

            var sportFormRepository = serviceProvider.GetRequiredService<ISportFormRepository>();

            var existingSportForm = sportFormRepository.SportForms
                .Include(z => z.Event)
                .ThenInclude(z => z.Tournament)
                .SingleOrDefault(z =>
                    z.Event.DisciplineGlobalId == _gameKey.DisciplineGlobalId &&
                    z.Event.EventId == _gameKey.GameId &&
                    z.Event.Tournament.TournamentId == _gameKey.TournamentId);

            if (existingSportForm == null)
            {
                Console.WriteLine("Match not found.");
                return null;
            }

            var providerHandlers = serviceProvider.GetServices<IProviderHandler>();
            var providerHandler =
                providerHandlers.FirstOrDefault(h => h.SportFormType == existingSportForm.SportFormType);

            if (providerHandler != null) return providerHandler;

            Console.WriteLine($"No provider handler found for SportFormType {existingSportForm.SportFormType}");
            return null;
        }


        private async Task UpdateStatusesAsync(
            ShortFormMatchEventViewItem eventItem, Event existingMatch, SportForm? sportForm,
            IEventRepository eventRepository, ISportFormRepository sportFormRepository, IServiceProvider serviceProvider,
            CancellationToken cancellationToken)
        {
            _providerHandler ??= InitializeProviderHandler(serviceProvider);
            _providerHandler?.HandleBaseEvent(eventItem);
            if (existingMatch.EventStatus != eventItem.EventStatus)
            {
                existingMatch.EventStatus = eventItem.EventStatus;
                if (eventItem is {RoundIdx:0,Event: 1})
                {
                    existingMatch.StartDT = DateTime.UtcNow;
                    eventItem.StartDateTime = existingMatch.StartDT;
                }

                eventRepository.Update(existingMatch);
                await eventRepository.SaveChangesAsync(cancellationToken);
            }
            if (sportForm != null && sportForm.TradingStatus != eventItem.TradingStatus)
            {
                sportForm.TradingStatus = eventItem.TradingStatus;
                sportFormRepository.Update(sportForm);
                await sportFormRepository.SaveChangesAsync(cancellationToken);
            }
        }


        private async Task<Event?> GetEventAsync(IEventRepository eventRepository, ISettingRepository settingRepository,
            CancellationToken cancellationToken)
        {
            var existingMatch = await eventRepository.Events
                .Include(z => z.Tournament)
                .SingleOrDefaultAsync(
                    z =>
                        z.DisciplineGlobalId == _gameKey!.DisciplineGlobalId && z.EventId == _gameKey.GameId &&
                        z.Tournament.TournamentId == _gameKey.TournamentId, cancellationToken);

            if (existingMatch != null) return existingMatch;
            {
                var f = await settingRepository.Settings
                    .AnyAsync(
                        z => _gameKey != null && z.Name == "UnidentifiedEvent" && z.Value == _gameKey.GameId.ToString(),
                        cancellationToken);

                if (f) return existingMatch;

                settingRepository.Create(new Setting
                {
                    Name = "UnidentifiedEvent",
                    Value = _gameKey?.GameId.ToString() ?? "null"
                });
                await settingRepository.SaveChangesAsync(cancellationToken);
                return existingMatch;
            }
        }

        private async Task<SportForm?> GetSportFormAsync(int eventId, ISportFormRepository sportFormRepository,
            CancellationToken cancellationToken)
        {
            return await sportFormRepository.SportForms
                .Include(sf => sf.MarketGroups)
                .ThenInclude(m => m.Markets)
                .ThenInclude(mo => mo.Outcomes)
                .SingleOrDefaultAsync(sf => sf.EventId == eventId, cancellationToken);
        }


        private async Task SendShortGameInfoToGameListener(ShortFormMatchEventViewItem eventItem, ISettingRepository settingRepository,
            CancellationToken cancellationToken)
        {
            settingRepository.Create(new Setting
            {
                Name = $"SR {DateTime.UtcNow}",
                Value = JsonConvert.SerializeObject(eventItem)
            });
            await settingRepository.SaveChangesAsync(cancellationToken);
            var groupName = $"Event-{eventItem.DisciplineGlobalId}-{eventItem.TournamentId}-{eventItem.GameId}";
            await _hubContext.Clients.Group(groupName)
                .NewRabbitItem(eventItem, cancellationToken);
        }

    }

I try to manually acknowledge message, but it does not work

 _channel.BasicConsume(queue: _settings.QueueName,
            autoAck: false, // Use manual acknowledgment
            consumer: consumer);
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            Console.WriteLine(message);

            try
            {
                var t = JsonConvert.DeserializeObject<QueueItemModel>(message);
                if (t is { Form: 2 })
                    await HandleShortenEventInfoAsync(t, CancellationToken.None);

                _channel.BasicAck(ea.DeliveryTag, false); // Acknowledge the message
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error processing message: {ex.Message}");
                _channel.BasicNack(ea.DeliveryTag, false, true); // Requeue the message
            }
        };

本文标签: cRabbitMQ consumer does no catch all messages from exchangeStack Overflow