admin管理员组文章数量:1287611
I'm building a .NET service that connects to an EMQX broker using MQTTnet. While the client connects to EMQX, when I test sending messages through MQTTX (MQTT client tool), my service never receives them. The subscription doesn't seem to be getting subscribed, despite using the SubscribeAsync
Code Structure
Program.cs (Service Setup)
var builder = Host.CreateApplicationBuilder(args);
// Configure services
builder.Services.AddSingleton<MQTTClient>();
builder.Services.AddHostedService<ServiceWorker>();
// Other configuration...
var host = builder.Build();
await host.RunAsync();
MQTTClient.cs
public class MQTTClient
{
private readonly ILogger<MQTTClient> _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;
public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Connected to MQTT server");
};
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Disconnected from MQTT server");
};
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";
bool ws = connectionstring.StartsWith("ws");
var options = new MqttClientOptionsBuilder()
.WithClientId(mqttClientId)
.WithCredentials(username, password)
.WithTls()
.WithCleanSession();
if (ws)
options.WithWebSocketServer(connectionstring);
else
options.WithTcpServer(connectionstring);
var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(builtOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
.Build();
await _mqttClient.StartAsync(mqttOptions);
}
public async Task SubscribeAsync()
{
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
}
private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);
try
{
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
{
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
// Process message...
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
}
}
ServiceWorker.cs
public class ServiceWorker : BackgroundService
{
private readonly ILogger<ServiceWorker> _logger;
private readonly MQTTClient _mqttClient;
public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
{
_logger = logger;
_mqttClient = mqttClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);
await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await _mqttClient.StopAsync(stoppingToken);
}
}
Here are the logs
Using what @brits provided I was able to get some logs output. Here's what those look like.
[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]
What I've Confirmed:
- The service successfully connects to EMQX (confirmed via logs and EMQX dashboard)
- The connection remains stable
- I can successfully publish test messages using MQTTX to the topics I'm trying to subscribe to
- The MQTTX messages are visible in the MQTTX History
- My service's
HandleMessageReceived
method is never triggered when messages are sent via MQTTX
Testing Setup:
- Using MQTTX to publish test messages to the topics
- Topics in configuration match exactly what I'm publishing to in MQTTX
- Messages are successfully published (confirmed in MQTTX which is subscribed to the same topic)
- Service is running and connected during testing
Environment:
- .NET 6.0
- MQTTnet Version="4.1.4.563"
- EMQX broker
- MQTTX
Question:
Why isn't my service receiving messages that I publish through MQTTX? The client connects successfully, but messages sent through MQTTX never trigger the HandleMessageReceived
method. What could be preventing the message reception?
I'm building a .NET service that connects to an EMQX broker using MQTTnet. While the client connects to EMQX, when I test sending messages through MQTTX (MQTT client tool), my service never receives them. The subscription doesn't seem to be getting subscribed, despite using the SubscribeAsync
Code Structure
Program.cs (Service Setup)
var builder = Host.CreateApplicationBuilder(args);
// Configure services
builder.Services.AddSingleton<MQTTClient>();
builder.Services.AddHostedService<ServiceWorker>();
// Other configuration...
var host = builder.Build();
await host.RunAsync();
MQTTClient.cs
public class MQTTClient
{
private readonly ILogger<MQTTClient> _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;
public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Connected to MQTT server");
};
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Disconnected from MQTT server");
};
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";
bool ws = connectionstring.StartsWith("ws");
var options = new MqttClientOptionsBuilder()
.WithClientId(mqttClientId)
.WithCredentials(username, password)
.WithTls()
.WithCleanSession();
if (ws)
options.WithWebSocketServer(connectionstring);
else
options.WithTcpServer(connectionstring);
var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(builtOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
.Build();
await _mqttClient.StartAsync(mqttOptions);
}
public async Task SubscribeAsync()
{
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
}
private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);
try
{
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
{
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
// Process message...
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
}
}
ServiceWorker.cs
public class ServiceWorker : BackgroundService
{
private readonly ILogger<ServiceWorker> _logger;
private readonly MQTTClient _mqttClient;
public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
{
_logger = logger;
_mqttClient = mqttClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);
await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await _mqttClient.StopAsync(stoppingToken);
}
}
Here are the logs
Using what @brits provided I was able to get some logs output. Here's what those look like.
[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]
What I've Confirmed:
- The service successfully connects to EMQX (confirmed via logs and EMQX dashboard)
- The connection remains stable
- I can successfully publish test messages using MQTTX to the topics I'm trying to subscribe to
- The MQTTX messages are visible in the MQTTX History
- My service's
HandleMessageReceived
method is never triggered when messages are sent via MQTTX
Testing Setup:
- Using MQTTX to publish test messages to the topics
- Topics in configuration match exactly what I'm publishing to in MQTTX
- Messages are successfully published (confirmed in MQTTX which is subscribed to the same topic)
- Service is running and connected during testing
Environment:
- .NET 6.0
- MQTTnet Version="4.1.4.563"
- EMQX broker
- MQTTX
Question:
Why isn't my service receiving messages that I publish through MQTTX? The client connects successfully, but messages sent through MQTTX never trigger the HandleMessageReceived
method. What could be preventing the message reception?
1 Answer
Reset to default 0Did you try checking your ACL?
In your ACL file, which you can access via https://<broker url>/#/authorization/detail/file
make sure your IP isn't blacklisted and/or you're using an unaccepted string for your Client ID.
本文标签:
版权声明:本文标题:c# - MQTT Client (MQTTnet) Not Successfully Subscribing to Topics Despite Connection to EMQX Broker - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741316120a2371924.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
_configuration["MQTT:Username"]
etc and a subscription toMQTT:Addtopic
? If you do this and the message is received it ensures the broker setup is correct (and messages are not blocked by ACL's etc). If that works some logs would help to trace the issue. – Brits Commented Feb 23 at 23:41