admin管理员组

文章数量:1122832

I want to implement some kind of round-robin connection to multiple RabbitMQ hosts with aio_pika:

import asyncio
 
import aio_pika
from loguru import logger
 
async def on_message(msg):
    logger.info(f"Message: {msg}")
 
class RabbitManager:
    rabbit_urls = [
        "amqp://guest:guest@localhost:5672/",
        "amqp://guest:guest@localhost:5673/",
    ]
 
    def __init__(self):
        self.connection = None
 
    async def connect_to_available_broker(self):
        for rabbit_url in self.rabbit_urls:
            try:
                self.connection = await aio_pika.connect(rabbit_url, timeout=1)
            except Exception as e:
                logger.warning(f"Failed to connect to RabbitMQ {rabbit_url}: {e}")
            else:
                logger.info(f"Connected to RabbitMQ {rabbit_url}")
                return
 
        raise ConnectionError("Could not connect to any RabbitMQ url")
 
async def main():
    rabbit_manager = RabbitManager()
    await rabbit_manager.connect_to_available_broker()
 
    try:
        async with rabbit_manager.connection:
            channel = await rabbit_manager.connection.channel()
            await channel.set_qos(prefetch_count=2)
            queue = await channel.declare_queue('test_queue')
            logger.info("Starting consuming queue")
            await queue.consume(on_message)
            await asyncio.Future()
    except Exception as exc:
        logger.error(f"An error occurred: {exc}")
 
if __name__ == '__main__':
    asyncio.run(main())

This establishes a connection to the available RabbitMQ URL when the script is run for the first time. Is it possible to track broker connection lost during consumer operation and reconnect to another RabbitMQ URL?

I tried to use close_callbacks

async def on_close(self, *args, **kwargs):
    logger.info("Trying to reconnect")
    await self.connect_to_available_broker()
 
self.connection.close_callbacks.add(self.on_close)

But seems it doesn't work completely right.

Example: when I manually stopped the script (CTRL+C), I see the following warning in the console:

Task was destroyed but it is pending!
task: <Task pending name='Task-25' coro=<OneShotCallback.__task_inner() running at /lib/python3.11/site-packages/aio_pika/tools.py:236>>
sys:1: RuntimeWarning: coroutine 'OneShotCallback.__task_inner' was never awaited

Looks like some tasks in aio_pika were not closed.

本文标签: Python aiopika roundrobin connection to multiple RabbitMQ hostsStack Overflow