admin管理员组文章数量:1122832
I want to implement some kind of round-robin connection to multiple RabbitMQ hosts with aio_pika:
import asyncio
import aio_pika
from loguru import logger
async def on_message(msg):
logger.info(f"Message: {msg}")
class RabbitManager:
rabbit_urls = [
"amqp://guest:guest@localhost:5672/",
"amqp://guest:guest@localhost:5673/",
]
def __init__(self):
self.connection = None
async def connect_to_available_broker(self):
for rabbit_url in self.rabbit_urls:
try:
self.connection = await aio_pika.connect(rabbit_url, timeout=1)
except Exception as e:
logger.warning(f"Failed to connect to RabbitMQ {rabbit_url}: {e}")
else:
logger.info(f"Connected to RabbitMQ {rabbit_url}")
return
raise ConnectionError("Could not connect to any RabbitMQ url")
async def main():
rabbit_manager = RabbitManager()
await rabbit_manager.connect_to_available_broker()
try:
async with rabbit_manager.connection:
channel = await rabbit_manager.connection.channel()
await channel.set_qos(prefetch_count=2)
queue = await channel.declare_queue('test_queue')
logger.info("Starting consuming queue")
await queue.consume(on_message)
await asyncio.Future()
except Exception as exc:
logger.error(f"An error occurred: {exc}")
if __name__ == '__main__':
asyncio.run(main())
This establishes a connection to the available RabbitMQ URL when the script is run for the first time. Is it possible to track broker connection lost during consumer operation and reconnect to another RabbitMQ URL?
I tried to use close_callbacks
async def on_close(self, *args, **kwargs):
logger.info("Trying to reconnect")
await self.connect_to_available_broker()
self.connection.close_callbacks.add(self.on_close)
But seems it doesn't work completely right.
Example: when I manually stopped the script (CTRL+C), I see the following warning in the console:
Task was destroyed but it is pending!
task: <Task pending name='Task-25' coro=<OneShotCallback.__task_inner() running at /lib/python3.11/site-packages/aio_pika/tools.py:236>>
sys:1: RuntimeWarning: coroutine 'OneShotCallback.__task_inner' was never awaited
Looks like some tasks in aio_pika were not closed.
本文标签: Python aiopika roundrobin connection to multiple RabbitMQ hostsStack Overflow
版权声明:本文标题:Python aio_pika: round-robin connection to multiple RabbitMQ hosts - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736307928a1933482.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论