admin管理员组文章数量:1308306
I want to run some function in the background, but be able to shut it down at will. I tried a bunch of solutions, the most obvious of which is asyncio
tasks:
# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
...
# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
try:
for item in source:
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print("Cancelled")
async def main():
# Run the function
data_source = create_data_source()
processing_task = asyncio.create_task(data_processor(data_source))
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
processing_task.cancel()
asyncio.run(main())
The problem is - once the task starts, the "Working" loop never terminates. I tried making the loop async, I tried sending it a shutdown signal using asyncio.Event
, and I even tried rewriting the function using concurrent.futures
, but nothing works. What am I missing?
Note:
- I searched through previous Q&As, but haven't found a solution.
- This is running on Python 3.11 for dependencies reasons, but if newer versions have the solution then I might be able to update.
I want to run some function in the background, but be able to shut it down at will. I tried a bunch of solutions, the most obvious of which is asyncio
tasks:
# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
...
# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
try:
for item in source:
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print("Cancelled")
async def main():
# Run the function
data_source = create_data_source()
processing_task = asyncio.create_task(data_processor(data_source))
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
processing_task.cancel()
asyncio.run(main())
The problem is - once the task starts, the "Working" loop never terminates. I tried making the loop async, I tried sending it a shutdown signal using asyncio.Event
, and I even tried rewriting the function using concurrent.futures
, but nothing works. What am I missing?
Note:
- I searched through previous Q&As, but haven't found a solution.
- This is running on Python 3.11 for dependencies reasons, but if newer versions have the solution then I might be able to update.
2 Answers
Reset to default 2Your data_processing
coroutine, which is not issuing any await
statements seems to be a poor candidate for being an async function. You should consider running it as a sync function using run_in_executor
using either a thread pool (the default) or a multiprocessing pool created with concurrent.futures.ProcessPoolExecutor
according to its CPU usage. For example:
Using a Multithreading Pool
import asyncio
import time
from threading import Event
# Data processing - should be able to run in the background until being shut down
def data_processor(stop_event) -> None:
start_time = time.time()
try:
while True:
if stop_event.is_set():
break
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print(f"Started at {start_time}, ended at {time.time()}")
async def main():
stop_event = Event()
# We don't need a larger pool:
loop = asyncio.get_running_loop()
awaitable = loop.run_in_executor(None, data_processor, stop_event)
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
stop_event.set()
await awaitable # Wait for task to complete
if __name__ == '__main__':
asyncio.run(main())
Prints:
Working
Working
...
Working
Started at 1738589602.7931993, ended at 1738589605.79333
Using a Multiprocessing Pool
Here we cannot directly pass a multiprocessing.Event
instance to the worker function and must instead use a pool initializer to initialize each pool process with a global reference to the event:
import asyncio
import time
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor
def init_pool(_stop_event: Event) -> None:
global stop_event
stop_event = _stop_event
# Data processing - should be able to run in the background until being shut down
def data_processor() -> None:
start_time = time.time()
try:
while True:
if stop_event.is_set():
break
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print(f"Started at {start_time}, ended at {time.time()}")
async def main():
stop_event = Event()
# We don't need a larger pool:
executor = ProcessPoolExecutor(1, initializer=init_pool, initargs=(stop_event,))
loop = asyncio.get_running_loop()
awaitable = loop.run_in_executor(executor, data_processor)
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
stop_event.set()
await awaitable # Wait for task to complete
if __name__ == '__main__':
asyncio.run(main())
Easy, But Perhaps Not the Best Solution
Last and least, you could just insert calls to await asyncio.sleep(0)
to give othe async tasks a chance to run:
import asyncio
import time
# Data processing - should be able to run in the background until being shut down
async def data_processor() -> None:
start_time = time.time()
try:
while True:
print("Working")
await asyncio.sleep(0)
except Exception as e:
print("Exception", e)
raise
finally:
print(f"Started at {start_time}, ended at {time.time()}")
async def main():
processing_task = asyncio.create_task(data_processor())
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
processing_task.cancel()
asyncio.run(main())
Perhaps something like this will work:
import threading
import time
class MyTask:
def __init__(self):
self.thread = None
self.stop_event = threading.Event()
def _wrapper(self, func, *args, **kwargs):
"""Wrapper function to run the task and check for stop signal."""
try:
func(self.stop_event, *args, **kwargs)
except Exception as e:
print(f"Task encountered an error: {e}")
def start_task(self, func, *args, **kwargs):
"""Starts the function in a separate thread."""
if self.thread and self.thread.is_alive():
print("Task is already running.")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._wrapper, args=(func, *args), kwargs=kwargs)
self.thread.start()
def stop_task(self):
"""Stops the running task."""
if self.thread and self.thread.is_alive():
self.stop_event.set() # Signal the function to stop
print("Stopping the task...")
else:
print("No active task to stop.")
def example_task(stop_event):
"""Example function that stops when the stop_event is set."""
for i in range(10):
if stop_event.is_set():
print("Task stopping early.")
return
print(f"Running... {i}")
time.sleep(1)
print("Task completed.")
if __name__ == "__main__":
task = MyTask()
task.start_task(example_task)
time.sleep(3)
task.stop_task()
Using the MyTask class you can create a background task on a different thread. The general idea is that you need to write the background task in a function (the example_task function) that accepts stop_event as an argument. It can accept other arguments as well, of course. In this function you must manually implement some logic that checks if stop_event is set and return if it is.
本文标签: Creating a stoppable background task in PythonStack Overflow
版权声明:本文标题:Creating a stoppable background task in Python - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741844422a2400697.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
await asyncio.sleep(0)
(or several of them depending on the code) could be inserted into the main loop. The question is how long does it take to reach such point in average or in worst case. – VPfB Commented Feb 3 at 9:39