admin管理员组

文章数量:1403182

I'm trying to upload files to multiple SFTP server locations based on the request. I read the sftp connection details from database and create a session factory, that is stored in a map of session factories based on a key. Now, when I try to create messagehandler for the same, I need to create a dynamic message handler with session factory created and stored in the map. I'm not sure how to dynamically use that map and messagehandler. Couldn't find enough source. I have given my code below. I would like to know which is the best approach for this. I did see spring integration flows, but I couldn't get how to use that dynamically. Can someone please help on this?

Message builder

public class SFTPService {
   
   private final MessageChannel uploadEmployerSftpChannel;
   
   @Retryable(
            retryFor = { MessagingException.class, IOException.class },
            backoff = @Backoff(delay = 5000)
    )
    public void uploadFromSftpConnection(@NonNull final SftpConnection sftpConnection,
                                         @NonNull final Resource file,
                                         @NonNull final String groupId) throws IOException {
        if (UploadEnabled) {
            log.info("Uploading file {} to server via InputStream with sftpRemoteDir header: {}", file.getFilename(), sftpConnection.getRemoteDirectory());

            final Message<InputStream> message = MessageBuilder.withPayload(file.getInputStream())
                    .setHeader(SFTP_REMOTE_DIR, sftpConnection.getRemoteDirectory())
                    .setHeader(FILE_NAME, file.getFilename())
                    .setHeader(GROUP_ID, groupId)
                    .build();
            uploadEmployerSftpChannel.send(message);
            log.info("File successfully uploaded to client server path: {}", sftpConnection.getRemoteDirectory());
        } else {
            log.info("upload is disabled. Skipping upload of file {} to client server.", file.getFilename());
        }
    }
}

SFTP Message handler

@RequiredArgsConstructor
@Configuration
public class SftpRuntimeSessionFactoryLocator {

    private final Map<Object, SessionFactory<SftpClient.DirEntry>> sftpSessionFactoryMap = new HashMap<>();
    private final SftpConnectionService sftpConnectionService;

    public SessionFactory getSessionFactory(final Object groupId) {
        if (!sftpSessionFactoryMap.containsKey(groupId)) {
            sftpSessionFactoryMap.put(groupId, generateSessionFactory(groupId.toString()));
        }
        return sftpSessionFactoryMap.get(groupId);
    }

    public SessionFactory<SftpClient.DirEntry> generateSessionFactory(final String key) {
        //get sftp connection details from database
        final SftpConnection sftpConnection = getSftpConnection(key);

        final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpConnection.getHost());
        factory.setPort(sftpConnection.getPort());
        factory.setPassword(sftpConnection.getPassword());
        factory.setUser(sftpConnection.getUser());
        factory.setAllowUnknownKeys(sftpConnection.isAllowUnknownKeys());
        return new CachingSessionFactory<>(factory);
    }

    private SftpConnection getSftpConnection(final String groupId) {
        return sftpConnectionService.getSFtpConnectionDetailsByGroupId(groupId);
    }

    @Bean
    public MessageChannel uploadEmployerSftpChannel() {
        return new DirectChannel();
    }

    @ServiceActivator(inputChannel = "uploadEmployerSftpChannel")
    public MessageHandler dynamicSftpMessageHandler() {
        return message -> {
            final SessionFactory sftpSessionFactory = getSessionFactory(message.getHeaders().get("groupId"));

            final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory);
            handler.setUseTemporaryFileName(false);

            handler.setRemoteDirectoryExpression(new FunctionExpression<Message<?>>(msg
                    -> msg.getHeaders().get("sftpRemoteDir", String.class)
            ));

            handler.setFileNameGenerator(msg -> {
                if (msg.getHeaders().containsKey("filename")) {
                    return msg.getHeaders().get("filename", String.class);
                }
                if (msg.getPayload() instanceof File) {
                    return ((File) msg.getPayload()).getName();
                }
                throw new IllegalArgumentException("filename not provided for SFTP Message");
            });
        };
    }
}

When I try to connect, I get error saying

Caused by: .springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [.springframework.integration.handler.MethodInvokingMessageProcessor@1b030042] (sftpRuntimeSessionFactoryLocator.dynamicSftpMessageHandler.serviceActivator)], failedMessage=GenericMessage [payload=sun.nio.ch.ChannelInputStream@378c9b97, headers={sftpRemoteDir=/target/outbound, filename=testfile.txt, id=aa6242a1-fce3-6ad4-c239-04131046f456, groupId=789, timestamp=1742477124410}]
    at .springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at .springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
    at .springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at .springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at .springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at .springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at .springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at .springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at .springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390)
    at .springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:361)
    at .springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:331)
    at .springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304)
    at company.notification.service.SftpService.uploadFromSftpConnection(SftpService.java:104)
    ... 158 more
Caused by: .springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

本文标签: javaSFTP Dynamic session factory message handlerStack Overflow