admin管理员组

文章数量:1304147

I was debugging some production code in my application, I have replaced the actual code with dummy code below, but the functionlity remains same

import asyncio

class RequestHandler:
    def __init__(self, max_concurrent_requests):
        # Semaphore to limit the number of concurrent requests
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)

    async def another_async_function(self, url):
        # This function will perform the actual async task, like a network request
        print(f"Start task for {url}")
        await asyncio.sleep(5)  # Simulate network delay
        print(f"End task for {url}")
        return url

    async def fake_request(self, url):
        # Using async with the semaphore to limit concurrency
        async with self.semaphore:
            # Instead of directly using await, call and return the result of another async function
            return self.another_async_function(url)

    async def next(self, urls):
        tasks = []
        # Create and schedule tasks for each URL
        for url in urls:
            print(f"Scheduling task for {url}...")
            task = asyncio.create_task(await self.fake_request(url))
            tasks.append(task)
        
        # Yield the results as tasks complete
        for task in tasks:
            result = await task  # Wait for task to complete and get result
            yield result  # Yield result as each task completes

    async def main(self):
        urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7"]
        
        # Get the async generator from next() and process each result as it's yielded
        async for result in self.next(urls):
            print(f"Processed result: {result}")

# Running the main function
request_handler = RequestHandler(max_concurrent_requests=2)  # Limit concurrency to 2
asyncio.run(request_handler.main())

I believe that the use of await inside create_task is wrong task = asyncio.create_task(await self.fake_request(url))

And I was expecting to see sequential result because of this,

But I am getting parallel execution for all the URLs, how is that possible, should not the for loop inside next method block for each URL processing, since there is an await inside create_task?

Here is the sample result

Scheduling task for url1...
Scheduling task for url2...
Scheduling task for url3...
Scheduling task for url4...
Scheduling task for url5...
Scheduling task for url6...
Scheduling task for url7...
Start task for url1
Start task for url2
Start task for url3
Start task for url4
Start task for url5
Start task for url6
Start task for url7
End task for url1
End task for url2
End task for url3
End task for url4
End task for url5
End task for url6
End task for url7
Processed result: url1
Processed result: url2
Processed result: url3
Processed result: url4
Processed result: url5
Processed result: url6
Processed result: url7


...Program finished with exit code 0
Press ENTER to exit console.

What I am missing here? How does this code work?

I asked ChatGpt, and that explains that use of await inside create_task is wrong, but fail to explain how this code was even able to acheive concurrent execution.

I would also like to understand the actual wrongdoings of this code.

The semaphore also seems to have no effect as all 7 url logs are printed in concurrent execution, while the sempahore limit is 2

I was debugging some production code in my application, I have replaced the actual code with dummy code below, but the functionlity remains same

import asyncio

class RequestHandler:
    def __init__(self, max_concurrent_requests):
        # Semaphore to limit the number of concurrent requests
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)

    async def another_async_function(self, url):
        # This function will perform the actual async task, like a network request
        print(f"Start task for {url}")
        await asyncio.sleep(5)  # Simulate network delay
        print(f"End task for {url}")
        return url

    async def fake_request(self, url):
        # Using async with the semaphore to limit concurrency
        async with self.semaphore:
            # Instead of directly using await, call and return the result of another async function
            return self.another_async_function(url)

    async def next(self, urls):
        tasks = []
        # Create and schedule tasks for each URL
        for url in urls:
            print(f"Scheduling task for {url}...")
            task = asyncio.create_task(await self.fake_request(url))
            tasks.append(task)
        
        # Yield the results as tasks complete
        for task in tasks:
            result = await task  # Wait for task to complete and get result
            yield result  # Yield result as each task completes

    async def main(self):
        urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7"]
        
        # Get the async generator from next() and process each result as it's yielded
        async for result in self.next(urls):
            print(f"Processed result: {result}")

# Running the main function
request_handler = RequestHandler(max_concurrent_requests=2)  # Limit concurrency to 2
asyncio.run(request_handler.main())

I believe that the use of await inside create_task is wrong task = asyncio.create_task(await self.fake_request(url))

And I was expecting to see sequential result because of this,

But I am getting parallel execution for all the URLs, how is that possible, should not the for loop inside next method block for each URL processing, since there is an await inside create_task?

Here is the sample result

Scheduling task for url1...
Scheduling task for url2...
Scheduling task for url3...
Scheduling task for url4...
Scheduling task for url5...
Scheduling task for url6...
Scheduling task for url7...
Start task for url1
Start task for url2
Start task for url3
Start task for url4
Start task for url5
Start task for url6
Start task for url7
End task for url1
End task for url2
End task for url3
End task for url4
End task for url5
End task for url6
End task for url7
Processed result: url1
Processed result: url2
Processed result: url3
Processed result: url4
Processed result: url5
Processed result: url6
Processed result: url7


...Program finished with exit code 0
Press ENTER to exit console.

What I am missing here? How does this code work?

I asked ChatGpt, and that explains that use of await inside create_task is wrong, but fail to explain how this code was even able to acheive concurrent execution.

I would also like to understand the actual wrongdoings of this code.

The semaphore also seems to have no effect as all 7 url logs are printed in concurrent execution, while the sempahore limit is 2

Share Improve this question edited Feb 4 at 10:59 himanshu sharma asked Feb 4 at 10:38 himanshu sharmahimanshu sharma 32 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

Semaphores have various uses. A couple of examples:

  1. Implement a critical section, i.e. code that can only be run against a shared resource by one task at a time.
  2. Throttling. For example, we only want to have at most N tasks running concurrently and when there are N tasks running we can only start a new task when a currently executing task completes. This is your situation.

Note that in both examples we are dealing with multiple (at least two) tasks trying to acquire a semaphore. In your code you have:

    async def fake_request(self, url):
        # Using async with the semaphore to limit concurrency
        async with self.semaphore:
            # Instead of directly using await, call and return the result of another async function
            return self.another_async_function(url)

Q: How many tasks are concurrently calling self.fake_request? A:self.fake_request is called only in one place:

    async def next(self, urls):
        tasks = []
        # Create and schedule tasks for each URL
        for url in urls:
            print(f"Scheduling task for {url}...")
            task = asyncio.create_task(await self.fake_request(url))
            tasks.append(task)

All calls to self.fake_request are being made from a single task in a loop. The result is that self.fake_request is never blocked from acquiring the semaphore, which it then immediately releases as soon as it returns. This semaphore accomplished nothing. If you are trying to throttle the number of URLs being processed concurrently, then self.fake_request should attempt to acquire the semaphore but not immediately release it. The semaphore should only be released when a URL-processing task completes:


    async def another_async_function(self, url):
        # This function will perform the actual async task, like a network request
        print(f"Start task for {url}")
        await asyncio.sleep(5)  # Simulate network delay
        print(f"End task for {url}")
        self.semaphore.release()  # Task is done, release the semaphore
        return url


    async def fake_request(self, url):
        # Using async with the semaphore to limit concurrency
        await self.semaphore.acquire()
        return self.another_async_function(url)

Prints:

Scheduling task for url1...
Scheduling task for url2...
Scheduling task for url3...
Start task for url1
Start task for url2
End task for url1
End task for url2
Scheduling task for url4...
Scheduling task for url5...
Start task for url3
Start task for url4
End task for url3
End task for url4
Scheduling task for url6...
Scheduling task for url7...
Start task for url5
Start task for url6
End task for url5
End task for url6
Processed result: url1
Processed result: url2
Processed result: url3
Processed result: url4
Processed result: url5
Processed result: url6
Start task for url7
End task for url7
Processed result: url7

Booboo's answer is excellent. I'm not posting this as an alternative but as additional information that I think might be useful.

Consider this snippet (assume it's part of an async def function, and that some_async_function is also an async def function):

x1 = some_async_function()
x2 = await some_async_function()

x1 is a "coroutine object." The code inside some_async_function has not run yet. In order to run the coroutine (execute its code), further action is required. It is closely analogous to a generator.

x2 might be of any type. It is the result returned by some_async_function. The await keyword causes the code inside some_async_function to run, and the entire expression await some_async_function evaluates to the returned object.

In your code, this line:

task = asyncio.create_task(await self.fake_request(url))

is logically equivalent to these two lines:

x = await self.fake_request(url)
task = asyncio.create_task(x)

The code for the Task that is created is not in self.fake_request, it's in the object that is returned by self.fake_request. In other words, the Task is actually another_async_function.

So you could have written simply:

task = asyncio.create_task(self.another_async_function(url))

and removed the extra function, along with the Semaphore. As Booboo pointed out, the Semaphore did nothing. His answer explains how to get the behavior you want. I thought it might be valuable to explain why your original approach didn't work.

I hate to argue with ChatGPT, but putting an await expression inside a call to asyncio.create_task is not necessarily wrong. As long as you understand the difference between x1 and x2 in my first example, it might be correct. But it is definitely an odd construction and probably should be classified as a "code smell."

本文标签: pythonUnexpected behavior from asyncawait in createtaskStack Overflow