admin管理员组

文章数量:1314556

Environmant:

  • NIFI: 2.1.0, vs 1.28.1
  • Kafka: kafka cluster built with bitnami/kafka:3.9.0 in docker
  • OS: Linux

The server.properties in 3 kafka nodes has been configured below (key portions):

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=10485760

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=10485760

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=10485760
message.max.bytes=10485760
replica.fetch.max.bytes=10485760

In Apache NIFI 2.1.0, the PublishKafka's properties has been configured:

Whenever I publish a message large than 1MB, PublishKafka will fail and the flowfile will be routed to Failure relationship. Any message smaller than 1MB can successfully be publised to the kafka cluster.

However, if I use Apache NIFI 1.28.1 and publish same message (larger than 1MB) to the same kafka cluster, it will be successful.

Below is the nifi-app.log regarding PublishKafka portion

2025-02-01 07:53:22,145 INFO [NiFi Web Server-3323] o.a.n.c.s.StandardProcessScheduler Running once PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7]
2025-02-01 07:53:22,146 INFO [Timer-Driven Process Thread-1] o.a.k.clients.producer.ProducerConfig ProducerConfig values:
        acks = -1
        auto.include.jmx.reporter = true
        batch.size = 16384
        bootstrap.servers = [192.168.34.89:9192]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-2
        compression.gzip.level = -1
        compression.lz4.level = 9
        compression.type = none
        compression.zstd.level = 3
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        enable.metrics.push = true
        interceptor.classes = []
        key.serializer = class .apache.kafkamon.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 5000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metadata.recovery.strategy = none
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.adaptive.partitioning.enable = true
        partitioner.availability.timeout.ms = 0
        partitioner.class = class .apache.kafka.clients.producer.internals.DefaultPartitioner
        partitioner.ignore.keys = false
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.max.ms = 1000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.header.urlencode = false
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class .apache.kafkamon.serialization.ByteArraySerializer

2025-02-01 07:53:22,147 INFO [Timer-Driven Process Thread-1] o.a.k.c.t.i.KafkaMetricsCollector initializing Kafka metrics collector
2025-02-01 07:53:22,148 WARN [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] DefaultPartitioner is deprecated.  Please clear partitioner.class configuration setting to get the default partitioning behavior
2025-02-01 07:53:22,149 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] Instantiated an idempotent producer.
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.k.clients.producer.ProducerConfig These configurations '[max.poll.records, isolation.level, default.api.timeout.ms]' were supplied but are not used yet.
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser Kafka version: 3.9.0
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser Kafka commitId: 84caaa6e9da06435
2025-02-01 07:53:22,152 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser Kafka startTimeMs: 1738367602152
2025-02-01 07:53:22,204 INFO [kafka-producer-network-thread | producer-2] .apache.kafka.clients.Metadata [Producer clientId=producer-2] Cluster ID: abcdefghijklmnopqrstuv
2025-02-01 07:53:22,205 INFO [kafka-producer-network-thread | producer-2] o.a.k.c.p.internals.TransactionManager [Producer clientId=producer-2] ProducerId set to 6042 with epoch 0
2025-02-01 07:53:22,205 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.StandardProcessScheduler Stopping PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7]
2025-02-01 07:53:22,205 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7] to run
2025-02-01 07:53:22,206 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
2025-02-01 07:53:22,207 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Metrics scheduler closed
2025-02-01 07:53:22,208 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Closing reporter .apache.kafkamon.metrics.JmxReporter
2025-02-01 07:53:22,208 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Closing reporter .apache.kafkamon.telemetry.internals.ClientTelemetryReporter
2025-02-01 07:53:22,208 INFO [Timer-Driven Process Thread-1] .apache.kafkamon.metrics.Metrics Metrics reporters closed
2025-02-01 07:53:22,209 INFO [Timer-Driven Process Thread-1] o.a.kafkamon.utils.AppInfoParser App info kafka.producer for producer-2 unregistered
2025-02-01 07:53:22,209 INFO [Timer-Driven Process Thread-1] o.a.n.controller.StandardProcessorNode PublishKafka[id=af69e9c7-0194-1000-382c-4bbca33ca7d7] has completely stopped. Completing any associated Futures.
2025-02-01 07:53:22,514 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller .apache.nifi.controller.FlowController@13263226 // Another save pending = false
^C
nifi@nifi-single:/opt/nifi/nifi-current/logs$

The Apache NIFI 1.28.1, the properties in PublishKafka_2_6 are as below

As the above, I thinkg the configurations in Kafka cluster should be OK. And the problem should be Apache NIFI 2.1.0

How to config PublishKafka in Apache NIFI 2.1.0 so that it can publish a message larger than 1MB ?

本文标签: