admin管理员组文章数量:1122832
i am trying implement Rabbit MQ consumer, that listens exchange 'XXX' and catches sport game event, but i have problem , some of messages from exchange are lost,and it seems that it is because of large amount of i/o operations in my class EventProcessor. How should i detect what's a problem: RabbitMQ incorrect configuration/work or i should optimize my processor class?
public class RabbitMqConsumer(IOptionsMonitor<RabbitMqSettings> options,
IEventProcessorManager eventProcessorManager, IServiceProvider serviceProvider)
: BackgroundService
{
private readonly RabbitMqSettings _settings = options.Get("RabbitMQConsumer");
private IConnection? _connection;
private IModel? _channel;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var factory = new ConnectionFactory()
{
HostName = _settings.HostName,
UserName = _settings.UserName,
Password = _settings.Password,
VirtualHost = _settings.VirtualHost
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclarePassive(queue: _settings.QueueName);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
var t = JsonConvert.DeserializeObject<QueueItemModel>(message);
if (t is not { Form: 2 })
return;
await HandleShortenEventInfoAsync(t, stoppingToken);
};
_channel.BasicConsume(queue: _settings.QueueName,
autoAck: true,
consumer: consumer);
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (Exception ex)
{
Console.WriteLine($"Exception in RabbitMqConsumer.ExecuteAsync: {ex}");
}
}
private async Task HandleShortenEventInfoAsync(QueueItemModel shortenEventInfo, CancellationToken stoppingToken)
{
try
{
using var scope = serviceProvider.CreateScope();
var settingRepository = scope.ServiceProvider.GetRequiredService<ISettingRepository>();
settingRepository.Create(new Setting
{
Name = $"RMQ {DateTime.UtcNow}",
Value = shortenEventInfo.Data.ToString(),
});
await settingRepository.SaveChangesAsync(stoppingToken);
if (shortenEventInfo.Type == "XXX")
{
var eventItem =
JsonConvert.DeserializeObject<ShortFormMatchEventViewItem>(shortenEventInfo.Data.ToString());
if (eventItem != null && shortenEventInfo.DisciplineGId != null)
{
eventItem.DisciplineGlobalId = shortenEventInfo.DisciplineGId;
var eventProcessor = eventProcessorManager.GetOrCreateProcessor(
new GameKey(eventItem.DisciplineGlobalId, eventItem.GameId, eventItem.TournamentId),
stoppingToken);
await eventProcessor.ProcessEventAsync(eventItem, stoppingToken);
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
public override void Dispose()
{
if (_channel != null)
{
_channel.Close();
_channel.Dispose();
}
if (_connection != null)
{
_connection.Close();
_connection.Dispose();
}
base.Dispose();
}
}
this my consumer, i create on the fly each event processor for group of events, if the other hangfire job does not catch to create it. Here code of processor:
public interface IEventProcessor
{
void SetGameKey(GameKey gameKey);
void SetProviderHandler(IProviderHandler providerHandler);
Task ProcessEventAsync(ShortFormMatchEventViewItem eventItem, CancellationToken cancellationToken);
event EventHandler<GameKey> MatchEnded;
}
public sealed class EventProcessor
: IEventProcessor
{
private readonly IServiceProvider _serviceProvider;
private readonly IHubContext<TalkActive, ITalkActiveClient> _hubContext;
private GameKey? _gameKey;
private IProviderHandler? _providerHandler;
public event EventHandler<GameKey>? MatchEnded;
private ShortFormMatchEventViewItem? _lastEventItem;
public EventProcessor(IServiceProvider serviceProvider, IHubContext<TalkActive, ITalkActiveClient> hubContext)
{
_serviceProvider = serviceProvider;
_hubContext = hubContext;
}
public void SetGameKey(GameKey gameKey)
{
_gameKey = gameKey;
}
public void SetProviderHandler(IProviderHandler? providerHandler)
{
_providerHandler = providerHandler;
}
public async Task ProcessEventAsync(ShortFormMatchEventViewItem eventItem, CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var serviceProvider = scope.ServiceProvider;
var eventRepository = serviceProvider.GetRequiredService<IEventRepository>();
var sportFormRepository = serviceProvider.GetRequiredService<ISportFormRepository>();
var settingRepository = serviceProvider.GetRequiredService<ISettingRepository>();
var existingMatch = await GetEventAsync(eventRepository, settingRepository, cancellationToken);
if (existingMatch != null)
{
eventItem.InnerTournamentId = existingMatch.TournamentId;
var sportForm = await GetSportFormAsync(existingMatch.Id, sportFormRepository, cancellationToken);
if (Enum.IsDefined(typeof(BaseMatchEvent), eventItem.Event))
{
await UpdateStatusesAsync(eventItem, existingMatch, sportForm, eventRepository, sportFormRepository,
serviceProvider,
cancellationToken);
}
await SendShortGameInfoToGameListener(eventItem, settingRepository, cancellationToken);
_lastEventItem = eventItem;
if (sportForm is { TradingStatus: TradingStatus.PreMatch or TradingStatus.Live })
{
_providerHandler ??= InitializeProviderHandler(serviceProvider);
var mathModelService = serviceProvider.GetRequiredService<IMathModelService>();
await mathModelService.ProcessMathModelForEventAsync(eventItem, existingMatch, sportForm,
_providerHandler, cancellationToken);
}
if (existingMatch.EventStatus == EventStatus.Finished &&
sportForm is { TradingStatus: TradingStatus.Finished })
{
OnMatchEnded();
}
}
}
private void OnMatchEnded()
{
if (_gameKey != null)
{
MatchEnded?.Invoke(this, _gameKey);
}
}
private IProviderHandler? InitializeProviderHandler(IServiceProvider serviceProvider)
{
if (_gameKey == null)
{
Console.WriteLine("GameKey is not set.");
return null;
}
var sportFormRepository = serviceProvider.GetRequiredService<ISportFormRepository>();
var existingSportForm = sportFormRepository.SportForms
.Include(z => z.Event)
.ThenInclude(z => z.Tournament)
.SingleOrDefault(z =>
z.Event.DisciplineGlobalId == _gameKey.DisciplineGlobalId &&
z.Event.EventId == _gameKey.GameId &&
z.Event.Tournament.TournamentId == _gameKey.TournamentId);
if (existingSportForm == null)
{
Console.WriteLine("Match not found.");
return null;
}
var providerHandlers = serviceProvider.GetServices<IProviderHandler>();
var providerHandler =
providerHandlers.FirstOrDefault(h => h.SportFormType == existingSportForm.SportFormType);
if (providerHandler != null) return providerHandler;
Console.WriteLine($"No provider handler found for SportFormType {existingSportForm.SportFormType}");
return null;
}
private async Task UpdateStatusesAsync(
ShortFormMatchEventViewItem eventItem, Event existingMatch, SportForm? sportForm,
IEventRepository eventRepository, ISportFormRepository sportFormRepository, IServiceProvider serviceProvider,
CancellationToken cancellationToken)
{
_providerHandler ??= InitializeProviderHandler(serviceProvider);
_providerHandler?.HandleBaseEvent(eventItem);
if (existingMatch.EventStatus != eventItem.EventStatus)
{
existingMatch.EventStatus = eventItem.EventStatus;
if (eventItem is {RoundIdx:0,Event: 1})
{
existingMatch.StartDT = DateTime.UtcNow;
eventItem.StartDateTime = existingMatch.StartDT;
}
eventRepository.Update(existingMatch);
await eventRepository.SaveChangesAsync(cancellationToken);
}
if (sportForm != null && sportForm.TradingStatus != eventItem.TradingStatus)
{
sportForm.TradingStatus = eventItem.TradingStatus;
sportFormRepository.Update(sportForm);
await sportFormRepository.SaveChangesAsync(cancellationToken);
}
}
private async Task<Event?> GetEventAsync(IEventRepository eventRepository, ISettingRepository settingRepository,
CancellationToken cancellationToken)
{
var existingMatch = await eventRepository.Events
.Include(z => z.Tournament)
.SingleOrDefaultAsync(
z =>
z.DisciplineGlobalId == _gameKey!.DisciplineGlobalId && z.EventId == _gameKey.GameId &&
z.Tournament.TournamentId == _gameKey.TournamentId, cancellationToken);
if (existingMatch != null) return existingMatch;
{
var f = await settingRepository.Settings
.AnyAsync(
z => _gameKey != null && z.Name == "UnidentifiedEvent" && z.Value == _gameKey.GameId.ToString(),
cancellationToken);
if (f) return existingMatch;
settingRepository.Create(new Setting
{
Name = "UnidentifiedEvent",
Value = _gameKey?.GameId.ToString() ?? "null"
});
await settingRepository.SaveChangesAsync(cancellationToken);
return existingMatch;
}
}
private async Task<SportForm?> GetSportFormAsync(int eventId, ISportFormRepository sportFormRepository,
CancellationToken cancellationToken)
{
return await sportFormRepository.SportForms
.Include(sf => sf.MarketGroups)
.ThenInclude(m => m.Markets)
.ThenInclude(mo => mo.Outcomes)
.SingleOrDefaultAsync(sf => sf.EventId == eventId, cancellationToken);
}
private async Task SendShortGameInfoToGameListener(ShortFormMatchEventViewItem eventItem, ISettingRepository settingRepository,
CancellationToken cancellationToken)
{
settingRepository.Create(new Setting
{
Name = $"SR {DateTime.UtcNow}",
Value = JsonConvert.SerializeObject(eventItem)
});
await settingRepository.SaveChangesAsync(cancellationToken);
var groupName = $"Event-{eventItem.DisciplineGlobalId}-{eventItem.TournamentId}-{eventItem.GameId}";
await _hubContext.Clients.Group(groupName)
.NewRabbitItem(eventItem, cancellationToken);
}
}
I try to manually acknowledge message, but it does not work
_channel.BasicConsume(queue: _settings.QueueName,
autoAck: false, // Use manual acknowledgment
consumer: consumer);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
try
{
var t = JsonConvert.DeserializeObject<QueueItemModel>(message);
if (t is { Form: 2 })
await HandleShortenEventInfoAsync(t, CancellationToken.None);
_channel.BasicAck(ea.DeliveryTag, false); // Acknowledge the message
}
catch (Exception ex)
{
Console.WriteLine($"Error processing message: {ex.Message}");
_channel.BasicNack(ea.DeliveryTag, false, true); // Requeue the message
}
};
本文标签: cRabbitMQ consumer does no catch all messages from exchangeStack Overflow
版权声明:本文标题:c# - RabbitMQ consumer does no catch all messages from exchange - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736308643a1933737.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论