admin管理员组文章数量:1314556
Environmant:
- NIFI: 2.1.0, vs 1.28.1
- Kafka: kafka cluster built with bitnami/kafka:3.9.0 in docker
- OS: Linux
The server.properties in 3 kafka nodes has been configured below (key portions):
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=10485760
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=10485760
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=10485760
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
In Apache NIFI 2.1.0, the PublishKafka's properties has been configured:
Whenever I publish a message large than 1MB, PublishKafka will fail and the flowfile will be routed to Failure relationship. Any message smaller than 1MB can successfully be publised to the kafka cluster.
However, if I use Apache NIFI 1.28.1 and publish same message (larger than 1MB) to the same kafka cluster, it will be successful.
Below is the nifi-app.log regarding PublishKafka portion
2025-02-01 07:53:22,145 INFO [NiFi Web Server-3323] o.a.n.c.s.StandardProcessScheduler Running once PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7]
2025-02-01 07:53:22,146 INFO [Timer-Driven Process Thread-1] o.a.k.clients.producer.ProducerConfig ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [192.168.34.89:9192]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-2
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = []
key.serializer = class .apache.kafkamon.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = class .apache.kafka.clients.producer.internals.DefaultPartitioner
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class .apache.kafkamon.serialization.ByteArraySerializer
2025-02-01 07:53:22,147 INFO [Timer-Driven Process Thread-1] o.a.k.c.t.i.KafkaMetricsCollector initializing Kafka metrics collector
2025-02-01 07:53:22,148 WARN [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] DefaultPartitioner is deprecated. Please clear partitioner.class configuration setting to get the default partitioning behavior
2025-02-01 07:53:22,149 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] Instantiated an idempotent producer.
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.k.clients.producer.ProducerConfig These configurations '[max.poll.records, isolation.level, default.api.timeout.ms]' were supplied but are not used yet.
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser Kafka version: 3.9.0
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser Kafka commitId: 84caaa6e9da06435
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser Kafka startTimeMs: 1738367602152
2025-02-01 07:53:22,204 INFO [kafka-producer-network-thread | producer-2] .apache.kafka.clients.Metadata [Producer clientId=producer-2] Cluster ID: abcdefghijklmnopqrstuv
2025-02-01 07:53:22,205 INFO [kafka-producer-network-thread | producer-2] o.a.k.c.p.internals.TransactionManager [Producer clientId=producer-2] ProducerId set to 6042 with epoch 0
2025-02-01 07:53:22,205 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.StandardProcessScheduler Stopping PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7]
2025-02-01 07:53:22,205 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7] to run
2025-02-01 07:53:22,206 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
2025-02-01 07:53:22,207 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Metrics scheduler closed
2025-02-01 07:53:22,208 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Closing reporter .apache.kafkamon.metrics.JmxReporter
2025-02-01 07:53:22,208 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Closing reporter .apache.kafkamon.telemetry.internals.ClientTelemetryReporter
2025-02-01 07:53:22,208 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Metrics reporters closed
2025-02-01 07:53:22,209 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser App info kafka.producer for producer-2 unregistered
2025-02-01 07:53:22,209 INFO [Timer-Driven Process Thread-1] o.a.n.controller.StandardProcessorNode PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7] has completely stopped. Completing any associated Futures.
2025-02-01 07:53:22,514 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller .apache.nifi.controller.FlowController@13263226 // Another save pending = false
^C
nifi@nifi-single:/opt/nifi/nifi-current/logs$
The Apache NIFI 1.28.1, the properties in PublishKafka_2_6 are as below
As the above, I thinkg the configurations in Kafka cluster should be OK. And the problem should be Apache NIFI 2.1.0
How to config PublishKafka in Apache NIFI 2.1.0 so that it can publish a message larger than 1MB ?
本文标签:
版权声明:本文标题:PublishKafka in Apache NIFI 2.1.0 can not publish message larger than 1MB, even MAX REQUEST SIZE is configed as 10MB - Stack Ove 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741967191a2407619.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论