admin管理员组

文章数量:1401849

I have an application that transfers messages between RabbitMQ and Azure, or vice versa.

Since upgrading to Camel 4+, it is required to use spring-rabbitmq instead of rabbitmq.

My RabbitMQ hosts are dynamic, so I previously created my endpoints like this:

private static String getRabbitMQEndpointURI(final MessageBrokerConnectionsProperties cnxs,
        final String exchangeName) {
    final MessageBrokerConnectionProperties cnx = cnxs.getServiceConnection();
    final StringBuilder uri = new StringBuilder(512)
            .append("rabbitmq:").append(exchangeName)
            .append("?hostname=").append(cnx.getHost())
            .append("&portNumber=").append(cnx.getPort());
    final Optional<String> virtualHost = cnx.getVirtualHost();
    if (virtualHost.isPresent()) {
        uri.append("&vhost=").append(virtualHost.get());
    }
    return uri.toString();
}

Now, with spring-rabbitmq, I changed it to:

private String getSpringRabbitMQEndpointURI(final String brokerName, String exchange, String queues) {
    StringBuilder builder = new StringBuilder("spring-rabbitmq:").append(exchange).append("?");
    builder.append("connectionFactory=#").append(brokerName);
    if (queues != null) {
        builder.append("&queues=").append(queues);
    }
    return builder.toString();
}

Since my RabbitMQ endpoints are dynamic and unknown at application startup, I tried creating ConnectionFactory instances on the fly before creating the endpoint:

final String endpointUri = getSpringRabbitMQEndpointURI(cnxs.getReference(), exchangeName, queues);
rabbitConnectionFactory.createConnection(cnxs);
final SpringRabbitMQEndpoint endpoint = camelCtx.getEndpoint(endpointUri, SpringRabbitMQEndpoint.class);

Here is how I create the ConnectionFactory:

public void createConnection(MessageBrokerConnectionsProperties brokerProperties) {
    if (connectionFactoryMap.get(brokerProperties.getReference()) != null) {
        log.info("ConnectionFactory spring-rabbit {} already created", brokerProperties.getReference());
        return;
    }
    MessageBrokerConnectionProperties cnx = brokerProperties.getServiceConnection();
    log.info("Creating RabbitMQ broker: {}", brokerProperties.getReference());
    CachingConnectionFactory factory = new CachingConnectionFactory(cnx.getHost(), cnx.getPort());
    factory.setUsername(cnx.getUser());
    factory.setPassword(String.valueOf(cnx.getPassword()));
    factory.setVirtualHost(cnx.getVirtualHost().orElse("/"));
    if (cnx.isSslEnabled()) {
        try {
            factory.getRabbitConnectionFactory().useSslProtocol("TLS"); // or SSLv3
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            log.error("Failed to enable TLS", e);
        }
    }
    factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
    factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(3000);

    // Add to Spring context dynamically
    ConfigurableApplicationContext configurableContext = (ConfigurableApplicationContext) context;
    DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) configurableContext.getBeanFactory();
    beanFactory.registerSingleton(brokerProperties.getReference(), factory);
    beanFactory.autowireBean(factory);

    connectionFactoryMap.put(brokerProperties.getReference(), factory);
}

However, this does not work. I always get the following error when resolving the endpoint:

Failed to resolve endpoint: spring-rabbitmq://distribution.canaux?connectionFactory=#localhost-5672-dev&queues=crd_distribution.canaux_qT_localhost-5673-dev 
due to: Error binding property (connectionFactory=#localhost-5672-dev) with name: connectionFactory on bean: spring-rabbitmq://my_exchanges?connectionFactory=%23localhost-5672-dev&queues=my_queue with value: #localhost-5672-dev

How can I properly register dynamic RabbitMQ ConnectionFactory instances in Spring Boot so that Camel can resolve them at runtime?

I tried to create all ConnectionFactory before the new CamelContext creation, but it's same. It's like camelcontext not registred le spring context...

本文标签: Dynamic RabbitMQ ConnectionFactory with SpringRabbitMQ in Apache Camel 4Stack Overflow