admin管理员组

文章数量:1124392

I'm struggling to get this bidirectional stream gRPC endpoint to work in unit tests with canceling an application's startup.

I've tried googling and copilot all day yesterday and I'm going in circles. I don't understand why grpc is different handling a request that is a bidirectional stream vs a unary call and why setting the status of the context works when not in a stream...

Additional context, the backend "launchApplicationService" has event handlers that report progress, which the RPC call listens to, which is why listening for updates is not happening in the while loop directly. It's also worth noting that the server and the backend logic seem to respond appropriately to canceling the call via Postman, but my unit tests are not working. I also will need to implement actual cancellation from a client written in flutter/dart.

Some advice or resource links would be great to help me figure this out. Thanks, have a good one!

Here's the RPC method:

public override async Task StartApplication(IAsyncStreamReader<StartApplicationRequest> requestStream, IServerStreamWriter<StartApplicationReply> responseStream, ServerCallContext context)
{
    CancellationToken cancelToken = context.CancellationToken;
    while (!cancelToken.IsCancellationRequested && await requestStream.MoveNext(cancelToken))
    {
        StartApplicationRequest request = requestStream.Current;
        try
        {
            SubscribeHandlers();
            startupStream = responseStream;
            ConfigurationDto? configurationDto = configurationService.Get(request.ConfigurationId);
            if (configurationDto == null)
            {
                context.Status = new Status(StatusCode.NotFound, $"{Languages.GetTranslatedText(ApplicationCoreConstants.ErrorApiStatusKeys.FailStartApplicationNotFound)} {request.ConfigurationId}.");
                await WriteToStartupStream(running: false, progress: 0);
                break;
            }
            configurationsHelper.DisableConfigIfInvalid(configurationDto);
            if (!configurationDto.ConfigEnabled || configurationDto.Location == null)
            {
                context.Status = new Status(StatusCode.FailedPrecondition, $"{Languages.GetTranslatedText(SimulatorCoreConstants.ErrorApiStatusKeys.ConfigurationNotValid)} {request.ConfigurationId}.");
                await WriteToStartupStream(running: false, progress: 0);
                break;
            }
            await Task.Run(() => launchApplicationService.StartApplication(configurationDto, context.CancellationToken));
            await WriteToStartupStream(running: true, progress: 100);
        }
        catch (Exception ex)
        {
            logger.Error(ex.Message, ex);
            context.Status = new Status(StatusCode.Internal, $"{Languages.GetTranslatedText(ApplicationCoreConstants.ErrorApiStatusKeys.FailStartApplication)} Error: {ex.Message}");
            await WriteToStartupStream(running: false);
            break;
        }
    }
    ClearHandlers();
    if (cancelToken.IsCancellationRequested)
    {
        context.Status = new Status(StatusCode.Cancelled, "Test canceled");
        CancelStartup();
        await WriteToStartupStream(running: false);
    }
}

Here's the tests:

[TestMethod]
[DataRow(StatusCode.OK, DisplayName = "Start Application success")]
[DataRow(StatusCode.Internal, DisplayName = "Start Application error")]
[DataRow(StatusCode.NotFound, DisplayName = "Start Application doesn't find configuration")]
[DataRow(StatusCode.FailedPrecondition, DisplayName = "Start Application config not enabled")]
[DataRow(StatusCode.Cancelled, DisplayName = "Start Application canceled")]
public async Task TestStartApplication(StatusCode expectedCode)
{
    MockConfigurationService mockConfigurationService = new();
    MockUserPreferencesService mockUserPreferencesService = new();
    MockStartSimulationService mockStartApplicationService = new();
    MockStopAllService mockStopAllService = new();
    MockConfigurationsHelper mockConfigurationsHelper = new();
    GrpcServicesImpl.grpcService grpcService = new GrpcServicesImpl.grpcService(
        mockConfigurationService.Mock.Object, mockStartApplicationService.Mock.Object, mockStopAllService.Mock.Object, mockConfigurationsHelper.Mock.Object);
    CancellationTokenSource cancelToken = new();
    TestServerCallContext mockContext = new(cancelToken.Token);
    if (expectedCode == StatusCode.NotFound)
    {
        mockConfigurationService.Mock.Setup(x => x.Get(It.IsAny<int>())).Returns(value: null);
    }
    else if (expectedCode == StatusCode.Internal)
    {
        mockConfigurationService.Mock.Setup(x => x.Get(It.IsAny<int>())).Throws(new Exception());
    }
    else if (expectedCode == StatusCode.FailedPrecondition)
    {
        ConfigurationDto configuration = TestHelpers.ConfigurationDto(1, configEnabled: false);
        mockConfigurationService.Mock.Setup(x => x.Get(It.IsAny<int>())).Returns(configuration);
    }
    // This else runs for an expected OK or CANCELLED response.
    else
    {
        var config = TestHelpers.ConfigurationDto(1);
        config.Brand!.BrandShortName = "company";
        config.VehicleConfiguration ??= new();
        config.VehicleConfiguration.Platform = ApplicationCoreConstants.Platforms.Vehicle;
        mockConfigurationService.Mock.Setup(x => x.Get(It.IsAny<int>())).Returns(config);
    }
    mockUserPreferencesService.Mock.Setup(x => x.Get(It.IsAny<int>())).Returns(TestHelpers.UserPreferencesDto(1));
    mockStartApplicationService.Mock.Setup(x => x.StartApplication(It.IsAny<ConfigurationDto>(), It.IsAny<CancellationToken>())).Callback(() => { });
    Mock<IAsyncStreamReader<StartApplicationRequest>> mockRequestStream = new();
    Mock<IServerStreamWriter<StartApplicationReply>> mockResponseStream = new();
    Queue<StartApplicationRequest> requestQueue = new([new StartApplicationRequest() { ConfigurationId = 1 }]);
    mockRequestStream.Setup(x => x.MoveNext(It.IsAny<CancellationToken>())).ReturnsAsync(() => requestQueue.Count > 0);
    mockRequestStream.Setup(x => x.Current).Returns(requestQueue.Peek());
    mockResponseStream.Setup(x => x.WriteAsync(It.IsAny<StartApplicationReply>())).Returns(Task.CompletedTask);
    Action startup = async () =>
    {
        Thread.Sleep(100);
        await grpcService.StartApplication(mockRequestStream.Object, mockResponseStream.Object, mockContext);
    };
    await Task.Run(startup);
    if (expectedCode == StatusCode.Cancelled)
    {
        cancelToken.Cancel();
        mockResponseStream.Verify(x => x.WriteAsync(It.IsAny<StartApplicationReply>()), Times.Never());
    }
    else if (expectedCode == StatusCode.OK)
    {
        mockResponseStream.Verify(x => x.WriteAsync(It.IsAny<StartApplicationReply>()), Times.Once());
    }
    Assert.AreEqual(expectedCode, mockContext.Status.StatusCode);
}

The idea being that when a request is read, we await full application startup unless the config is not found or invalid or whatever. If a cancel gets requested, the while loop is broken out of and we set the context's status to CANCELLED. However, in unit testing, the test expecting a status code CANCELLED fails because it seems to always return OK, and I'm not sure why when the method is explicitly setting the status to CANCELLED.

本文标签: