admin管理员组

文章数量:1302328

Overview of problem:

I have two services with two different endpoints, I would like to exchange messages between them using pub/sub only. In the future there may be other services that need to process these messages (that is why pub/sub).

I have implemented this and it is working:

Service 1 (client) can publish messages and Service 2 (server) is subscribed to these messages, receives, and processes them.

When Service 2 (server) attempts to publish a response (result of process) to the topic from inside the message handler - the message is published and received by Service 1 (client) as expected --- however, on Service 2 (server)'s side there is an error generated:

Immediate Retry is going to retry message ... because of an exception:
System.InvalidOperationException: No handlers could be found for message type: TP.GetDocumentInfoRes
at NServiceBus.LoadHandlersConnector.Invoke...
at NServiceBus.ProcessingStatisticsBehavior.Invoke...
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke...
at NServiceBus.Transport.AzureServiceBus.MessagePump.ProcessMessage...

It seems that NServiceBus is expecting/requiring Service 2 (server) - the service that is publishing the message - to have a message handler for the message that it is publishing.

I can confirm that this is the case by creating a message handler on Service 2 (server) for the type that it is publishing that does nothing - and that stops the error from being generated.

Question:

What am I doing wrong/misunderstand with regard to how NServiceBus implements pub/sub - how do I avoid generating this exception and the corresponding retries?

I would expect the published messages to be published and copies distributed to the queue of each subscriber regardless of whether or not that subscriber is online or not (I believe this works)

I would not expect the publisher to be required to have a handler for the type it publishes and certainly not for it to be verified/enforced. I'm expecting pub/sub to be fire and fet as some subscribers might be offline at any given time.

Things I have already done

I have already implemented the NServiceBus Pub/Sub examples like Ping Pong and have noticed this same behavior if I break that sample into two separate services with two separate endpoints.

More Info

When I examine the error message properties, it looks like service 2 is replying to itself separately from the publish.

Message properties show fields like:

replyTo: Service2_dev
NServiceBus.ReplyToAddress: Service2_dev
NServiceBus.OriginatingEndpoint: Service2_dev
NServiceBus.ProcessingEndpoint: Service2_dev
NServiceBus.MessageIntent: Publish

Code

Message:

public interface IMessage : IEvent
{
   ...
}

public class Message : IMessage
{
    ...
}

public interface IRequest : IMessage {}

public class Request : Message, IRequest
{
    ...
}

public interface IResponse<T> : IMessage where T : IMessage 
{
    public T Request {get;}

    ...
}

public abstract class Response<T> : Message, IResponse<T> where T : IMessage
{
    public T Request {get; set;}

    ...
}

public interface IGetDocumentInfoRequest : IRequest
{
    GetDocumentInfoCriteria Criteria { get; set; }
}

[DataContract]
public class GetDocumentInfoReq : Request, IGetDocumentInfoRequest
{
    [DataMember]
    public GetDocumentInfoCriteria Criteria { get; set; }

    ...
}

public interface IGetDocumentInfoResponse : IResponse<GetDocumentInfoReq>
{
    GetDocumentInfoResult Result { get; set; }
}

[DataContract]
public class GetDocumentInfoRes : Response<GetDocumentInfoReq>, IGetDocumentInfoResponse, IEvent
{
    [DataMember]
    public GetDocumentInfoResult Result { get; set; }

    ...
}

Both Services' Program Main

public class Program
{
    public static void Main(string[] args)
    {
        HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
        ...
        EndpointConfiguration endpointConfiguration = new 
        EndpointConfiguration(${builder.Configuration["ServiceName"]}
        _{builder.Configuration["Instance"]}");
        endpointConfiguration.UseTransport(new AzureServiceBusTransport(
        builder.Configuration.GetConnectionString("AzureServiceBusConnectionString")));
        endpointConfiguration.UseSerialization<SystemJsonSerializer>();
        endpointConfiguration.EnableInstallers();

        builder.UseNServiceBus(endpointConfiguration);
        ...
     }
}

Service 1 (client)

In a Quartz task calls publish on IMessageSession:

        await MSG.Publish(new GetDocumentInfoReq(
        Guid.Parse("db94a8a8-7ade-4ae3-81fe-4b4b578f4444"), Guid.NewGuid(), criteria));

Handles response message:

public class GetDocumentInfoResponseHandler : ResponseMessageHandler<GetDocumentInfoRes>,
IHandleMessages<GetDocumentInfoRes>
{
    ...

    public async override Task BusinessLogicAsync(GetDocumentInfoRes Message,
    IMessageHandlerContext Context)
    {
        DAL.SaveGetDocumentInfoResult(Message.Result);

        await Task.CompletedTask;
    }
}

Service 2 (Server)

Message handler that processes request and publishes response:

public class GetDocumentInformationRequestHandler :
RequestMessageHandler<GetDocumentInfoReq>, IHandleMessages<GetDocumentInfoReq>
{
    ...

    public async override Task BusinessLogicAsync(GetDocumentInfoReq Message,
    IMessageHandlerContext Context)
    {
        await Context.Publish(new GetDocumentInfoRes(Message, await 
        CLN.GetDocumentInfoAsync(Message.Criteria.ToGetDocumentInfoCriteria(loginResult)), 
        this.ProcessingBegan, DateTime.Now));
        //Publishing here is what is generating the no subscriber error for Service 2 (this
        service that is publishing the message)

    }
}

Queues and Topics

Queues in Azure portal looked correct:

Service1_dev
Service2_dev
Error <-- where the no sub errors ended up

The topics/subscriptions looked correct as well:

bundle-1
   service1_dev
     $default
     TP.GetDocumentInfoRes
   service2_dev
     $default
     TP.GetDocumentInfoReq

*When I added the dummy response handler to Service 2 it of course added a subscription for that

Any help would be greatly appreciated.

Overview of problem:

I have two services with two different endpoints, I would like to exchange messages between them using pub/sub only. In the future there may be other services that need to process these messages (that is why pub/sub).

I have implemented this and it is working:

Service 1 (client) can publish messages and Service 2 (server) is subscribed to these messages, receives, and processes them.

When Service 2 (server) attempts to publish a response (result of process) to the topic from inside the message handler - the message is published and received by Service 1 (client) as expected --- however, on Service 2 (server)'s side there is an error generated:

Immediate Retry is going to retry message ... because of an exception:
System.InvalidOperationException: No handlers could be found for message type: TP.GetDocumentInfoRes
at NServiceBus.LoadHandlersConnector.Invoke...
at NServiceBus.ProcessingStatisticsBehavior.Invoke...
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke...
at NServiceBus.Transport.AzureServiceBus.MessagePump.ProcessMessage...

It seems that NServiceBus is expecting/requiring Service 2 (server) - the service that is publishing the message - to have a message handler for the message that it is publishing.

I can confirm that this is the case by creating a message handler on Service 2 (server) for the type that it is publishing that does nothing - and that stops the error from being generated.

Question:

What am I doing wrong/misunderstand with regard to how NServiceBus implements pub/sub - how do I avoid generating this exception and the corresponding retries?

I would expect the published messages to be published and copies distributed to the queue of each subscriber regardless of whether or not that subscriber is online or not (I believe this works)

I would not expect the publisher to be required to have a handler for the type it publishes and certainly not for it to be verified/enforced. I'm expecting pub/sub to be fire and fet as some subscribers might be offline at any given time.

Things I have already done

I have already implemented the NServiceBus Pub/Sub examples like Ping Pong and have noticed this same behavior if I break that sample into two separate services with two separate endpoints.

More Info

When I examine the error message properties, it looks like service 2 is replying to itself separately from the publish.

Message properties show fields like:

replyTo: Service2_dev
NServiceBus.ReplyToAddress: Service2_dev
NServiceBus.OriginatingEndpoint: Service2_dev
NServiceBus.ProcessingEndpoint: Service2_dev
NServiceBus.MessageIntent: Publish

Code

Message:

public interface IMessage : IEvent
{
   ...
}

public class Message : IMessage
{
    ...
}

public interface IRequest : IMessage {}

public class Request : Message, IRequest
{
    ...
}

public interface IResponse<T> : IMessage where T : IMessage 
{
    public T Request {get;}

    ...
}

public abstract class Response<T> : Message, IResponse<T> where T : IMessage
{
    public T Request {get; set;}

    ...
}

public interface IGetDocumentInfoRequest : IRequest
{
    GetDocumentInfoCriteria Criteria { get; set; }
}

[DataContract]
public class GetDocumentInfoReq : Request, IGetDocumentInfoRequest
{
    [DataMember]
    public GetDocumentInfoCriteria Criteria { get; set; }

    ...
}

public interface IGetDocumentInfoResponse : IResponse<GetDocumentInfoReq>
{
    GetDocumentInfoResult Result { get; set; }
}

[DataContract]
public class GetDocumentInfoRes : Response<GetDocumentInfoReq>, IGetDocumentInfoResponse, IEvent
{
    [DataMember]
    public GetDocumentInfoResult Result { get; set; }

    ...
}

Both Services' Program Main

public class Program
{
    public static void Main(string[] args)
    {
        HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
        ...
        EndpointConfiguration endpointConfiguration = new 
        EndpointConfiguration(${builder.Configuration["ServiceName"]}
        _{builder.Configuration["Instance"]}");
        endpointConfiguration.UseTransport(new AzureServiceBusTransport(
        builder.Configuration.GetConnectionString("AzureServiceBusConnectionString")));
        endpointConfiguration.UseSerialization<SystemJsonSerializer>();
        endpointConfiguration.EnableInstallers();

        builder.UseNServiceBus(endpointConfiguration);
        ...
     }
}

Service 1 (client)

In a Quartz task calls publish on IMessageSession:

        await MSG.Publish(new GetDocumentInfoReq(
        Guid.Parse("db94a8a8-7ade-4ae3-81fe-4b4b578f4444"), Guid.NewGuid(), criteria));

Handles response message:

public class GetDocumentInfoResponseHandler : ResponseMessageHandler<GetDocumentInfoRes>,
IHandleMessages<GetDocumentInfoRes>
{
    ...

    public async override Task BusinessLogicAsync(GetDocumentInfoRes Message,
    IMessageHandlerContext Context)
    {
        DAL.SaveGetDocumentInfoResult(Message.Result);

        await Task.CompletedTask;
    }
}

Service 2 (Server)

Message handler that processes request and publishes response:

public class GetDocumentInformationRequestHandler :
RequestMessageHandler<GetDocumentInfoReq>, IHandleMessages<GetDocumentInfoReq>
{
    ...

    public async override Task BusinessLogicAsync(GetDocumentInfoReq Message,
    IMessageHandlerContext Context)
    {
        await Context.Publish(new GetDocumentInfoRes(Message, await 
        CLN.GetDocumentInfoAsync(Message.Criteria.ToGetDocumentInfoCriteria(loginResult)), 
        this.ProcessingBegan, DateTime.Now));
        //Publishing here is what is generating the no subscriber error for Service 2 (this
        service that is publishing the message)

    }
}

Queues and Topics

Queues in Azure portal looked correct:

Service1_dev
Service2_dev
Error <-- where the no sub errors ended up

The topics/subscriptions looked correct as well:

bundle-1
   service1_dev
     $default
     TP.GetDocumentInfoRes
   service2_dev
     $default
     TP.GetDocumentInfoReq

*When I added the dummy response handler to Service 2 it of course added a subscription for that

Any help would be greatly appreciated.

Share Improve this question edited Feb 11 at 1:24 gsn1074 asked Feb 10 at 23:38 gsn1074gsn1074 1362 silver badges11 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 0

I figured out the issue:

The default filters for the topic use a sql LIKE with wildcards at the beginning and end of the message type's fully qualitified name.

Ex.

[NServiceBus.EnclosedMessageTypes] LIKE '%TP.GetDocumentInfoReq%'

My response messages contain the request message inside them; the response message type is a generic where T is the request message type.

Ex.

public class GetDocumentInfoRes : Response<GetDocumentInfoReq>, IGetDocumentInfoResponse, IEvent

When the filter compares the SQL like statement to the NServiceBus.EnclosedMessageTypes field, it finds a match to the generic parameter of the response message:

TP.GetDocumentInfoRes, Services.Messages, Version=1.0.2.0, Culture=neutral, PublicKeyToken=null;TP.IGetDocumentInfoResponse, Services.Messages, Version=1.0.2.0, Culture=neutral, PublicKeyToken=null;TP.Messages.Common.IResponse`1[[TP.GetDocumentInfoReq, Services.Messages, Version=1.0.2.0, Culture=neutral, PublicKeyToken=null]], Messages.Common, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null;TP.Messages.Common.IMessage, Messages.Common, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null;TP.Messages.Common.Response`1[[TP.GetDocumentInfoReq, Services.Messages, Version=1.0.2.0, Culture=neutral, PublicKeyToken=null]], Messages.Common, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null;TP.Messages.Common.Message, Messages.Common, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null

So the request subscription on Service2 was also subscribing to the response.

Solutions:

  1. I removed the leading % in the like statement to make the filter match the first type mentioned in the EnclosedMessageTypes field. This worked but it's not ideal as it would require me to manually modified the filters for every subscription, unless I can find a configuration setting that would modified the default filter format

  2. I could remove the request object from the response object - this would be unfortunate as it is super convenient to have it there and it is how my systems have worked for the past 1.5 decades using MassTransit & RabbitMq

The subscription rules uses % both at the beginning and at the end of the LIKE clause to support the following scenarios:

  • Interface-based inheritance
  • Evolution of the message contract

More docs here.

If you prefer to create your own subscription rules in general, you can:

Disable AutoSubscribe

To disable the feature add this line to your endpoint configuration:

endpointConfiguration.DisableFeature<AutoSubscribe>();

More information here.

Don't run EnableInstallers

By default, installers are disabled. They are enabled by this code:

endpointConfiguration.EnableInstallers();

// this will run the installers await
Endpoint.Start(endpointConfiguration);

So by not making the call to EnableInstallers you will opt out of auto generating any transport infrastructure.

More information here.

本文标签: