admin管理员组

文章数量:1289529

The Problem

I have a simplified example of an asynchronous program (Python 3.9) that is not working when exceptions are raised in futures, and I am looking for ways to gracefully terminate it.

In particular, when the number of failed requests exceeds the sum of the queue size and the number of workers, I end up in a situation where the queue is full (see COMMENT1 in the code snippet below), and I can no longer submit new requests, nor fail gracefully. I am looking for advice how to avoid this situation by raising the fact that an exception occurred and shutting the entire program down. In this specific case, I do not need to reraise the specific errors of worker as those are logged. Thank you in advance!

import asyncio

HEALTHY_REQUESTS = 10
MAX_WORKERS = 3
QUEUE_SIZE = 19
# REQUESTS >= HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 1  ---> full queue, program stalls because queue is full
REQUESTS = HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 2


async def task_producer(requests):

    print(f'Starting task producer with: {requests}')
    request_queue = asyncio.Queue(maxsize=QUEUE_SIZE)  # starting tasks

    print('Initializing futures of the queue')
    request_workers = [asyncio.ensure_future(request_consumer(request_queue)) for _ in range(MAX_WORKERS)]

    # Submit requests
    for req in requests:
        print(f'Putting request {req} into queue, {request_queue.qsize()}')
        # COMMENT1: we get stuck here if
        # REQUESTS = HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 2 or more.
        # We get stuck later if there is one less,
        # but I think we can ignore this fortunate case where we could still do something
        await request_queue.put(req)

    # Wait for all requests to come back
    await request_queue.put(None)
    await asyncio.wait(request_workers)

    print('Getting results from the parser')
    return 0


async def request_consumer(request_queue):
    print('request consumer started')
    while True:
        request_info = await request_queue.get()
        await asyncio.sleep(0.5)
        if request_info is None:
            await request_queue.put(None)
            break
        elif request_info > HEALTHY_REQUESTS:
            raise RuntimeError(f'Breaking thing in make requests with request={request_info}')
        print(f'{request_info} - request consumer is finished with request')


if __name__ == '__main__':
    requests = list(range(REQUESTS))
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    producer = loop.create_task(task_producer(requests))
    producer_result = loop.run_until_complete(producer)
    loop.close()
    print(f'Output of producer: {producer_result}')

Solution Attempts

Attempt 1

Tried adding a check for exceptions raised in the worker futures, which in turn I put in the for loop and before putting a request (see COMMENT1). See the code snippet of the worker status check is presented below.

This works in the contrived example above in a sense, that it helps terminating the loop, but the program still gets stuck when putting None afterwards. In the actual program, this does not work the same. Somehow the queue gets filled before this check has a chance to return False.

def workers_are_healthy(request_workers):
    for worker in request_workers:
        try:
            exception = worker.exception()
        except asyncio.InvalidStateError:
            continue
        if isinstance(exception, Exception):
            return False
    return True

Attempt 2

Found a something that works, but I really do not like it... If I create an instance variable in the class where this code is packaged in, (effectively a global variable in the example above,) I can use it in a separate task to stop the program. In particular, I create another task, which then is passed into loop.run_until_complete. Inside of this task, I check every few seconds what the value of the global variable is. If an error were to occur inside of a specific worker (request_workers), I change its value, and hope that this works. Feels very wrong...

The Problem

I have a simplified example of an asynchronous program (Python 3.9) that is not working when exceptions are raised in futures, and I am looking for ways to gracefully terminate it.

In particular, when the number of failed requests exceeds the sum of the queue size and the number of workers, I end up in a situation where the queue is full (see COMMENT1 in the code snippet below), and I can no longer submit new requests, nor fail gracefully. I am looking for advice how to avoid this situation by raising the fact that an exception occurred and shutting the entire program down. In this specific case, I do not need to reraise the specific errors of worker as those are logged. Thank you in advance!

import asyncio

HEALTHY_REQUESTS = 10
MAX_WORKERS = 3
QUEUE_SIZE = 19
# REQUESTS >= HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 1  ---> full queue, program stalls because queue is full
REQUESTS = HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 2


async def task_producer(requests):

    print(f'Starting task producer with: {requests}')
    request_queue = asyncio.Queue(maxsize=QUEUE_SIZE)  # starting tasks

    print('Initializing futures of the queue')
    request_workers = [asyncio.ensure_future(request_consumer(request_queue)) for _ in range(MAX_WORKERS)]

    # Submit requests
    for req in requests:
        print(f'Putting request {req} into queue, {request_queue.qsize()}')
        # COMMENT1: we get stuck here if
        # REQUESTS = HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 2 or more.
        # We get stuck later if there is one less,
        # but I think we can ignore this fortunate case where we could still do something
        await request_queue.put(req)

    # Wait for all requests to come back
    await request_queue.put(None)
    await asyncio.wait(request_workers)

    print('Getting results from the parser')
    return 0


async def request_consumer(request_queue):
    print('request consumer started')
    while True:
        request_info = await request_queue.get()
        await asyncio.sleep(0.5)
        if request_info is None:
            await request_queue.put(None)
            break
        elif request_info > HEALTHY_REQUESTS:
            raise RuntimeError(f'Breaking thing in make requests with request={request_info}')
        print(f'{request_info} - request consumer is finished with request')


if __name__ == '__main__':
    requests = list(range(REQUESTS))
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    producer = loop.create_task(task_producer(requests))
    producer_result = loop.run_until_complete(producer)
    loop.close()
    print(f'Output of producer: {producer_result}')

Solution Attempts

Attempt 1

Tried adding a check for exceptions raised in the worker futures, which in turn I put in the for loop and before putting a request (see COMMENT1). See the code snippet of the worker status check is presented below.

This works in the contrived example above in a sense, that it helps terminating the loop, but the program still gets stuck when putting None afterwards. In the actual program, this does not work the same. Somehow the queue gets filled before this check has a chance to return False.

def workers_are_healthy(request_workers):
    for worker in request_workers:
        try:
            exception = worker.exception()
        except asyncio.InvalidStateError:
            continue
        if isinstance(exception, Exception):
            return False
    return True

Attempt 2

Found a something that works, but I really do not like it... If I create an instance variable in the class where this code is packaged in, (effectively a global variable in the example above,) I can use it in a separate task to stop the program. In particular, I create another task, which then is passed into loop.run_until_complete. Inside of this task, I check every few seconds what the value of the global variable is. If an error were to occur inside of a specific worker (request_workers), I change its value, and hope that this works. Feels very wrong...

Share Improve this question edited Feb 20 at 19:56 J.K. asked Feb 20 at 15:55 J.K.J.K. 1,6181 gold badge15 silver badges21 bronze badges 3
  • 1 When a worker (a.k.a. consumer) has a problem (an exception), it exits. I don't understand why, could you please explain? Losing workers from the pool must lead to a slow-down and then to a halt. I would expect a worker should recover. I mean the whole request processing should be guarded with a try-except in order to be able always able to continue the work with the next request. – VPfB Commented Feb 20 at 16:57
  • @VPfB, are you suggesting that workers raising exceptions is a bad idea to begin with? Idea for them to fail was to raise an exception as soon as the services we are interacting start to misbehave to end the entire program. – J.K. Commented Feb 20 at 17:36
  • 1 FYI, trio and anyio implement level-based rather than edge-based cancellation, and when used correctly generally tend to avoid the class of problem you're dealing with altogether. – Charles Duffy Commented Feb 20 at 19:08
Add a comment  | 

2 Answers 2

Reset to default 1

With a little refactoring we can create a main task that monitors the completion of all tasks and gets notified as soon as any task ends with an exception.

The main task creates the consumer and producer tasks. It then issues a call to asyncio.wait against the tasks specifying return_when=asyncio.FIRST_EXCEPTION so that it returns as soon as any task completes with an exception. wait returns a "done" and "pending" set. The "done" set is iterated to detect which task, if any, completed with an exception. The "pending" set is then iterated so that any pending tasks are canceled. If all tasks completed normally, then processing is continued by calling get_parser_results.

import asyncio

HEALTHY_REQUESTS = 10
MAX_WORKERS = 3
QUEUE_SIZE = 19
REQUESTS = 30

async def task_producer(request_queue, requests):

    print(f'Starting task producer with: {requests}')

    # Submit requests
    for req in requests:
        print(f'Putting request {req} into queue, {request_queue.qsize()}')
        await request_queue.put(req)

    for _ in range(MAX_WORKERS):
        await request_queue.put(None)  # Tell tasks to terminate:

async def get_parser_results():
    """This is involed if and only if all requests have completed normally."""
    print('Getting results from the parser')
    return 0


async def request_consumer(request_queue):
    global error_count

    print('request consumer started')

    while True:
        request_info = await request_queue.get()
        if request_info is None:
            break

        await asyncio.sleep(0.5)  # Emulate work
        if request_info > HEALTHY_REQUESTS:
            raise RuntimeError(f'Breaking thing in make requests with request={request_info}')

async def main():
    request_queue = asyncio.Queue(maxsize=QUEUE_SIZE)  # starting tasks
    print('Initializing consumers')
    tasks = [asyncio.create_task(request_consumer(request_queue)) for _ in range(MAX_WORKERS)]

    requests = list(range(REQUESTS))
    producer_task = asyncio.create_task(task_producer(request_queue, requests))
    tasks.append(producer_task)

    # If no exception, then this call will only return when all tasks have completed
    # successfully and pending will be empty:
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    had_exception = False
    for task in done:
        try:
            result = task.result()
        except Exception:
            had_exception = True
            print(task)
            break

    # Now cancel any remaining tasks:
    if pending:
        for task in pending:
            task.cancel()

        # wait for all canceled tasks to finish
        await asyncio.wait(pending)

    if not had_exception:
        # All requests have completed normally, so
        # we can now get the parser results
        return await get_parser_results()

if __name__ == '__main__':
    producer_result = asyncio.run(main())
    print(f'Output of producer: {producer_result}')

Prints:

Initializing consumers
request consumer started
request consumer started
request consumer started
Starting task producer with: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
Putting request 0 into queue, 0
Putting request 1 into queue, 1
Putting request 2 into queue, 2
Putting request 3 into queue, 3
Putting request 4 into queue, 4
Putting request 5 into queue, 5
Putting request 6 into queue, 6
Putting request 7 into queue, 7
Putting request 8 into queue, 8
Putting request 9 into queue, 9
Putting request 10 into queue, 10
Putting request 11 into queue, 11
Putting request 12 into queue, 12
Putting request 13 into queue, 13
Putting request 14 into queue, 14
Putting request 15 into queue, 15
Putting request 16 into queue, 16
Putting request 17 into queue, 17
Putting request 18 into queue, 18
Putting request 19 into queue, 19
Putting request 20 into queue, 17
Putting request 21 into queue, 18
Putting request 22 into queue, 19
Putting request 23 into queue, 17
Putting request 24 into queue, 18
Putting request 25 into queue, 19
Putting request 26 into queue, 17
Putting request 27 into queue, 18
Putting request 28 into queue, 19
Putting request 29 into queue, 17
Exception in task <Task finished name='Task-4' coro=<request_consumer() done, defined at C:\Booboo\test.py:27> exception=RuntimeError('Breaking thing in make requests with request=11')>
Output of producer: None

In comments I wrote that in a standard worker pool, a problem with one request should not crash a worker.

In general, terminating an asyncio task with an exception when nobody checks that task's result is not a good idea.

Anyway, you want to shutdown the whole program after first error, so it might be OK to let a worker task die, just somebody must notice it.

Your "attempt #2" has one fine idea and I mean creating a separate shutdown task. Instead of polling a global variable, let's monitor all worker tasks. When any of them exits and its result is an Exception, trigger a shutdown immediately. Doing shutdown usually means to cancel all remaining tasks and wait for them to exit. Otherwise (no exception) wait for remaing workers to exit normally.

Functions that you might find useful in the shutdown task:

  • await asyncio.wait() with return_when=FIRST_COMPLETED parameter
  • alternative: asyncio.as_completed()
  • Task.result() or Task.exception()

Small side note: please check asyncio.run and asyncio.create_task. Those are more recent versions of some calls you are using.

本文标签: asynchronousGracefully terminate asyncio program in Python with a full queueStack Overflow