admin管理员组

文章数量:1313284

I have a WPF application that I require SSE. I was looking into websockets but this seems to be quite hard to anise using CPANEL and WHM due to port restrictions etc.

The SSE is running and may run for hours. I have a reconnect system which fires if the connection is lost.

I have a 30 second heartbeat from the server back to my client which seems to run fine.

My issue is that I can leave the system running for hours and when i check the log I actually have heaps of IO Exception - server timeouts (then reconnect). There are no logs on the node server saying there are issues.

This is intermittent. It can run for 10 hours with no issue and then othertimes I get a dropout every few minutes just running for one hour.

public static async Task StartSSEListenerAsync(CancellationToken cancellationToken)
{
    string fullUrl = $"{URLSettings.getServerName(APITargets.PHAPPS)}/{APICalls.EventStream.Subscribe}?computerName={Uri.EscapeDataString(computerName)}";
    string authToken = URLSettings.tennantAPIKey;
    int retryCount = 0;
    const int maxRetries = 5;
    const int retryDelay = 5000; // 5 seconds

    if (!client.DefaultRequestHeaders.Contains("X-Token"))
    {
        client.DefaultRequestHeaders.Add("X-Token", authToken);
    }

    //client.Timeout = Timeout.InfiniteTimeSpan;
    const int maxSilentTime = 120000; // 2 minutes
    while (!cancellationToken.IsCancellationRequested)
    {
        try
        {
            StreamLogger.Info("Attempting to connect to SSE...");
            using (var response = await client.GetAsync(fullUrl, HttpCompletionOption.ResponseHeadersRead, cancellationToken))
            {
                response.EnsureSuccessStatusCode();
                StreamLogger.Info("SSE Connected.");
                Console.WriteLine("SSE Connected.");

                retryCount = 0; // Reset retry count on successful connection
                _lastEventTime = DateTime.UtcNow;

                using (var stream = await response.Content.ReadAsStreamAsync())
                using (var reader = new StreamReader(stream))
                {
                    while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested)
                    {
                        var line = await reader.ReadLineAsync();
                        if (!string.IsNullOrEmpty(line) && line.StartsWith("data: "))
                        {
                            _lastEventTime = DateTime.UtcNow;
                            string json = line.Substring(6); // Remove "data: "
                            var eventData = JsonConvert.DeserializeObject<StreamEventData>(json);
                            StreamLogger.Info($"Event Data received: {eventData.EventType}");
                            Console.WriteLine($"Event Data received: {eventData.EventType}");
                            try
                            {
                                switch (eventData.EventType)
                                {
                                    case ListenerEvents.QudaDashboardUpdate:
                                        EventReceived_QudaDashboardUpdate?.Invoke(null, eventData.Data);
                                        break;
                                    case ListenerEvents.QudaSettingsModelUpdate:
                                        EventReceived_QudaSettingsModelUpdate?.Invoke(null, eventData.Data);
                                        break;
                                    case ListenerEvents.APIRequestReconnect:
                                        EventReceived_APIRequestReconnect?.Invoke(null, eventData.Data);
                                        break;
                                    case ListenerEvents.HaloPayWarningUpdate:
                                        EventReceived_HaloPayWarningUpdate?.Invoke(null, eventData.Data);
                                        break;
                                    default:
                                        AppLogger.Warning($"Unexpected stream event received: {eventData.EventType}");
                                        break;
                                }
                            }
                            catch (Exception ex)
                            {
                                AppLogger.Error($"Error processing event: {ex.Message}\n{ex.StackTrace}");
                            }
                        }
                        else
                        {
                            Console.WriteLine(line);
                        }
                        
                    }
                }
            }

            StreamLogger.Warning("SSE Stream ended. Attempting to reconnect...");
        }
        catch (IOException ioEx)
        {
            retryCount++;
            string innerExceptionMessage = ioEx.InnerException?.Message ?? "No inner exception";
            StreamLogger.Error($"IOException occurred: {ioEx.Message}\nInner Exception: {innerExceptionMessage}\nStack Trace: {ioEx.StackTrace}");

            if (retryCount > maxRetries)
            {
                StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
                break;
            }

            StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
            await Task.Delay(retryDelay, cancellationToken);
        }
        catch (HttpRequestException httpEx)
        {
            retryCount++;
            StreamLogger.Error($"HttpRequestException occurred: {httpEx.Message}\n{httpEx.StackTrace}");
            if (retryCount > maxRetries)
            {
                StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
                break;
            }
            StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
            await Task.Delay(retryDelay, cancellationToken);
        }
        catch (TaskCanceledException taskCanceledEx) when (!cancellationToken.IsCancellationRequested)
        {
            retryCount++;
            StreamLogger.Warning($"TaskCanceledException occurred (likely due to timeout): {taskCanceledEx.Message}");
            if (retryCount > maxRetries)
            {
                StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
                break;
            }
            StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
            await Task.Delay(retryDelay, cancellationToken);
        }
        catch (Exception ex)
        {
            retryCount++;
            StreamLogger.Error($"Unexpected exception occurred: {ex.Message}\n{ex.StackTrace}");
            if (retryCount > maxRetries)
            {
                StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
                break;
            }
            StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
            await Task.Delay(retryDelay, cancellationToken);
        }
    }
}

NodeJS:

router.get('/Subscribe', verifyJWT, (req, res) => {
    // Log all events on req and res
    //logEvents(req, 'Request');
    //logEvents(res, 'Response');

    const chemistID = req.user._id;
    const computerName = req.queryputerName;

    if (!computerName) {
        return res.status(400).send("Missing computer name");
    }

    if (!globals.clients[chemistID]) {
        globals.clients[chemistID] = {};
    }

    if (globals.clients[chemistID][computerName]) {
        console.log(`Replacing existing connection for ChemistID: ${chemistID}, Computer: ${computerName}`);
        globals.clients[chemistID][computerName].res.end();
    }

    globals.clients[chemistID][computerName] = { res };

    console.log(`Device Connected: ChemistID ${chemistID}, Computer ${computerName}`);

    req.on('aborted', () => {
    //console.log(`Connection aborted: ChemistID ${chemistID}, Computer ${computerName}`);
});

req.on('close', () => {
    try {
        //console.log(`Connection closed: ChemistID ${chemistID}, Computer ${computerName}`);
        delete globals.clients[chemistID][computerName];

        if (Object.keys(globals.clients[chemistID]).length === 0) {
            console.log(`No more clients for ChemistID ${chemistID}. Cleaning up.`);
            delete globals.clients[chemistID];
        }
    } catch (err) {
        console.error(`Error during connection close cleanup: ${err.message}`);
    }
});


    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();
});


// Attach an event logger to the request object
function logEvents(emitter, emitterName) {
    const originalEmit = emitter.emit;

    emitter.emit = function (event, ...args) {
        console.log(`[${emitterName}] Event: ${event}`);
        return originalEmit.call(emitter, event, ...args);
    };
}




// Send periodic heartbeats to all clients
setInterval(() => {
    Object.keys(globals.clients).forEach(chemistID => {
        Object.keys(globals.clients[chemistID]).forEach(clientName => {
            const client = globals.clients[chemistID][clientName];

            try {
                client.res.write(`: heartbeat\n\n`); // Send a comment line as a heartbeat
                //console.log("heartbeat sent")
            } catch (error) {
                console.error(`Failed to send heartbeat to ChemistID: ${chemistID}, Computer: ${clientName}. Removing client.`);
                
                // Remove the stale client
                delete globals.clients[chemistID][clientName];

                // If no more clients for this chemistID, clean up the chemist entry
                if (Object.keys(globals.clients[chemistID]).length === 0) {
                    delete globals.clients[chemistID];
                }
            }
        });
    });
}, 15000); // Every 45 seconds

I would apprecaite any help with this because I can't seem to find a log or anything that points me in the direction of WHY this is acutally occuring.

I have checked my main C# Logs and just get this info:

[Error] [31/01/2025 03:09:51] [STM] IOException occurred: The read operation failed, see inner exception.
Inner Exception: The operation has timed out.
Stack Trace:    at System.Net.Http.HttpClientHandler.WebExceptionWrapperStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.Net.Http.DelegatingStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.StreamReader.ReadBuffer()
   at System.IO.StreamReader.get_EndOfStream()
   at PH.Common.SharedUI.Services.StreamService.<StartSSEListenerAsync>d__16.MoveNext() in C:\Users...

I cant see any issues or logging in NodeJS and to be honest I dont know where to lookin in CPanel/WHM (would love thh advice on this please)

本文标签: nodejsC Server Side Event Intermittent IO Exception occuringStack Overflow