admin管理员组

文章数量:1122832

I've been trying to setup an MSK connector to dump a topic into Iceberg table using iceberg-kafka-connect. Glue catalog is in another account, so I have to use AssumeRoleAwsClientFactory. The following is the configuration:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=false
topics=msk_topic
iceberg.tables=glue_database.msk_topic
iceberg.tables.evolve-schema-enabled=true
iceberg.tables.schema-force-optional=true
iceberg.tables.auto-create-enabled=true
iceberg.tables.auto-create-props.format-version=2
iceberg.controlmit.interval-ms=60000
iceberg.catalog.warehouse=s3a://path/to/bucket/iceberg
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.glue.id=xxxxxxxxx
iceberg.catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory
iceberg.catalog.client.assume-role.arn=arn:aws:iam::xxxxxxxxx:role/msk-connector-role
iceberg.catalog.client.assume-role.region=eu-west-1
iceberg.catalog.client.region=eu-west-1

However, I see the following error:

software.amazon.awssdk.core.exception.SdkClientException: Unable to load region from any of the providers in the chain software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain@7a13edf7: [software.amazon.awssdk.regions.providers.SystemSettingsRegionProvider@789d5db2: Unable to load region from system settings. Region must be specified either via environment variable (AWS_REGION) or  system property (aws.region)., software.amazon.awssdk.regions.providers.AwsProfileRegionProvider@544a6634: No region provided in profile: default, software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@24d89e92: Unable to retrieve region information from EC2 Metadata service. Please make sure the application is running on EC2.
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
at software.amazon.awssdk.regions.providers.AwsRegionProviderChain.getRegion(AwsRegionProviderChain.java:70)
at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.resolveRegion(AwsDefaultClientBuilder.java:293)
at software.amazon.awssdk.utils.AttributeMap$DerivedValue.primeCache(AttributeMap.java:600)
at software.amazon.awssdk.utils.AttributeMap$DerivedValue.get(AttributeMap.java:589)
at software.amazon.awssdk.utils.AttributeMap$Builder.resolveValue(AttributeMap.java:396)
at software.amazon.awssdk.utils.AttributeMap$Builder.internalGet(AttributeMap.java:389)
at software.amazon.awssdk.utils.AttributeMap$Builder.access$1300(AttributeMap.java:201)
at software.amazon.awssdk.utils.AttributeMap$Builder$1.get(AttributeMap.java:399)
at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.resolveSigningRegion(AwsDefaultClientBuilder.java:260)
at software.amazon.awssdk.utils.AttributeMap$DerivedValue.primeCache(AttributeMap.java:600)
at software.amazon.awssdk.utils.AttributeMap$DerivedValue.get(AttributeMap.java:589)
at software.amazon.awssdk.utils.AttributeMap$Builder.resolveValue(AttributeMap.java:396)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at software.amazon.awssdk.utils.AttributeMap$Builder.build(AttributeMap.java:362)
at software.amazon.awssdk.core.client.config.SdkClientConfiguration$Builder.build(SdkClientConfiguration.java:232)
at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.finalizeAwsConfiguration(AwsDefaultClientBuilder.java:184)
at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.finalizeChildConfiguration(AwsDefaultClientBuilder.java:161)
at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.syncClientConfiguration(SdkDefaultClientBuilder.java:188)
at software.amazon.awssdk.services.sts.DefaultStsClientBuilder.buildClient(DefaultStsClientBuilder.java:36)
at software.amazon.awssdk.services.sts.DefaultStsClientBuilder.buildClient(DefaultStsClientBuilder.java:25)
at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.build(SdkDefaultClientBuilder.java:155)
at org.apache.iceberg.aws.AssumeRoleAwsClientFactory.sts(AssumeRoleAwsClientFactory.java:131)
at org.apache.iceberg.aws.AssumeRoleAwsClientFactory.applyAssumeRoleConfigurations(AssumeRoleAwsClientFactory.java:105)
at software.amazon.awssdk.utils.builder.SdkBuilder.applyMutation(SdkBuilder.java:61)
at org.apache.iceberg.aws.AssumeRoleAwsClientFactory.glue(AssumeRoleAwsClientFactory.java:56)
at org.apache.iceberg.aws.glue.GlueCatalog.initialize(GlueCatalog.java:149)
at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:255)
at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:309)
at io.tabular.iceberg.connect.data.Utilities.loadCatalog(Utilities.java:69)
at io.tabular.iceberg.connect.channel.TaskImpl.<init>(TaskImpl.java:35)
at io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:56)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:641)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:706)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:458)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

The stacktrace clearly shows AssumeRoleAwsClientFactory is being instantiated. Looking at source code, AssumeRoleAwsClientFactory should be using provided region.

本文标签: