admin管理员组

文章数量:1356386

Application is written in C# and running on .NET 8, it uses EventStore 23.10.0-jammy.

Code subscribes to event store, but constantly subscription is dropped with exception:

{
    "Status": 
    {
        "StatusCode": "Unavailable",
        "Detail": "Error reading next message. HttpIOException: The response ended prematurely while waiting for the next frame from the server. (ResponseEnded)",
        "DebugException":
        {
            "HttpRequestError": "ResponseEnded",
            "Message": "The response ended prematurely while waiting for the next frame from the server. (ResponseEnded)",
            "TargetSite": "Void ThrowRequestAborted(System.Exception)",
            "Data": [],
            "InnerException": null,
            "HelpLink": null,
            "Source": "System.Net.Http",
            "HResult": -2146232800,
            "StackTrace":"   at System.Net.Http.Http2Connection.ThrowRequestAborted(Exception innerException)\r\n   at System.Net.Http.Http2Connection.Http2Stream.CheckResponseBodyState()\r\n   at System.Net.Http.Http2Connection.Http2Stream.TryReadFromBuffer(Span`1 buffer, Boolean partOfSyncRead)\r\n   at System.Net.Http.Http2Connection.Http2Stream.ReadDataAsync(Memory`1 buffer, HttpResponseMessage responseMessage, CancellationToken cancellationToken)\r\n   at Grpc.Net.Client.StreamExtensions.ReadMessageAsync[TResponse](Stream responseStream, GrpcCall call, Func`2 deserializer, String grpcEncoding, Boolean singleMessage, CancellationToken cancellationToken)",
            "$type": "HttpIOException"
        },
        "$type": "Status"
    },
    "StatusCode": "Unavailable",
    "Trailers": [],
    "TargetSite": "Void MoveNext()",
    "Message": "Status(StatusCode=\"Unavailable\", Detail=\"Error reading next message. HttpIOException: The response ended prematurely while waiting for the next frame from the server. (ResponseEnded)\", DebugException=\"System.Net.Http.HttpIOException: The response ended prematurely while waiting for the next frame from the server. (ResponseEnded)\r\n   at System.Net.Http.Http2Connection.ThrowRequestAborted(Exception innerException)\r\n   at System.Net.Http.Http2Connection.Http2Stream.CheckResponseBodyState()\r\n   at System.Net.Http.Http2Connection.Http2Stream.TryReadFromBuffer(Span`1 buffer, Boolean partOfSyncRead)\r\n   at System.Net.Http.Http2Connection.Http2Stream.ReadDataAsync(Memory`1 buffer, HttpResponseMessage responseMessage, CancellationToken cancellationToken)\r\n   at Grpc.Net.Client.StreamExtensions.ReadMessageAsync[TResponse](Stream responseStream, GrpcCall call, Func`2 deserializer, String grpcEncoding, Boolean singleMessage, CancellationToken cancellationToken)\")",
    "Data": [], 
    "InnerException": null,
    "HelpLink": null,
    "Source": "EventStore.Client",
    "HResult": -2146233088,
    "StackTrace":"   at EventStore.Client.Interceptors.TypedExceptionInterceptor.AsyncStreamReader`1.MoveNext(CancellationToken cancellationToken)\r\n   at EventStore.Client.AsyncStreamReaderExtensions.ReadAllAsync[T](IAsyncStreamReader`1 reader, CancellationToken cancellationToken)+MoveNext()\r\n   at EventStore.Client.AsyncStreamReaderExtensions.ReadAllAsync[T](IAsyncStreamReader`1 reader, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()\r\n   at System.Linq.AsyncEnumerable.SelectEnumerableAsyncIterator`2.MoveNextCore() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Select.cs:line 221\r\n   at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 70\r\n   at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 75\r\n   at EventStore.Client.EventStoreClient.ReadInternal(ReadReq request, UserCredentials userCredentials, CancellationToken cancellationToken)+MoveNext()\r\n   at EventStore.Client.EventStoreClient.ReadInternal(ReadReq request, UserCredentials userCredentials, CancellationToken cancellationToken)+MoveNext()\r\n   at EventStore.Client.EventStoreClient.ReadInternal(ReadReq request, UserCredentials userCredentials, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()\r\n   at EventStore.Client.StreamSubscription.Enumerable.Enumerator.MoveNextAsync()\r\n   at EventStore.Client.StreamSubscription.Subscribe()\r\n   at EventStore.Client.StreamSubscription.Subscribe()",
    "$type": "RpcException"
} 

What is the reason of dropped subscription? How to avoid a dropped connection?

Code:

public async Task<ICustomEventStoreSubscription> SubscribeToEventsAsync(ulong? startFromPosition,
    Func<EventWithMeta, Task> eventHandler,
    Action<ICustomEventStoreSubscription,string,Exception?>? droppedHandler)
{
    var startPosition = startFromPosition == null
        ? FromAll.Start
        : FromAll.After(new Position(startFromPosition.Value, startFromPosition.Value));

    var subscription = await _client.SubscribeToAllAsync(startPosition, EventHandler, subscriptionDropped: DroppedHandler);
    
    return new CustomEventStoreSubscription(subscription);

    void DroppedHandler(StreamSubscription streamSubscription, SubscriptionDroppedReason resolvedEvent, Exception? cancellationToken)
    {
        droppedHandler?.Invoke(new CustomEventStoreSubscription(streamSubscription), resolvedEvent.ToString(), cancellationToken);
    }

    async Task EventHandler(StreamSubscription streamSubscription, ResolvedEvent resolvedEvent, CancellationToken cancellationToken)
    {
        var eventOrNull = DeserializeEventOrNull(EventTypeAndData.From(resolvedEvent.Event));
        var eventWithMeta = new EventWithMeta(eventOrNull.Item1, eventOrNull.Item2, resolvedEvent.OriginalPosition?.CommitPosition, resolvedEvent.OriginalEventNumber.ToUInt64());
        await eventHandler(eventWithMeta);
    }
}

本文标签: cEventStore drops subscription constantlyStack Overflow