admin管理员组文章数量:1291123
I have been using Spark v3.5 Spark Stream functionality for the below use case. I am observing the issue below on one of the environments with Spark Stream. Please if I can get some assistance with the same will be of valuable help.
Use case:
Application is using Spark streaming as to consume messages from one source topic, business logic to filter out some of these messages, and publish accordingly on target topics.
ExecutorMem: 6G ExecutorCPU: 3CPU
Initial Configuration:
Kafka Source topic: 24 brokers, 400 partition
Kafka Target topic: 200 partition
executorsInstances: 15
executors cores: 3
Input rate: 200-250 messages/sec
BackPressure=true
BackPressureInitalRate: 200000(default in pom)
Now they have a request that there will be some trigger of ban refresh occasionally which will be triggering messages at 800-1000 messages/sec.
New Configuration done:
Stage 1:
Kafka Source topic: 24 brokers, 1200 partition
Kafka Target topic: 800 partition
executorsInstances: 120
executors cores: 10
Input rate: 200-250 messages/sec - 800-1000 messages/sec
BackPressure=true
BackPressureInitalRate: 100
Spark batch schedule time – 0.5 sec
maxRatePerPartition: 1000
All 120 Executors come up at time of startup due to lag already piled up on Kafka.
Problem:
When there is huge lag already in the system, and on startup of application, due to backpressure initial rate, only 1 message were picked up from each partition, and hence batch size of records in Spark stream were 1200 only. Over a period of time these delays get cleared, and the system works fine without scheduling delays for a few hours with incoming traffic consistent to 100-200 tps. This time the batch size goes to 50-100 records/batch Now suddenly ban refresh gets triggered and incoming rate increases to 800-1000 tps. This time, the executors are not able to cope up and the lags keep on increasing on Kafka side. And the processing time of batches keeps getting reduced, eventually making the scheduling delay and hence reducing the batch size to almost 15-20 records/batches. Observation is small batches were taking more processing time than big batches. Another issue with NETWORK_CONNECTION error to get metadata from Kafka while publishing
Solution:
Removed the initial rate parameter of backpressure so that when there are millions of records already piled up on Kafka, the batches created will be of more records/batch than the original 1200 records/batch. This is due to batches with more records that seem to have better processing time than small records. Also disabled the backpressure (though not ideal), so we have consistent records/batch size to deal with and improve processing time. Handled the NETWORK_CONNECTION issue.
Stage 2:
Kafka Source topic: 24 brokers, 1200 partition
Kafka Target topic: 800 partition
executorsInstances: 120
executors cores: 10
Input rate: 200-250 messages/sec - 800-1000 messages/sec
BackPressure=false
maxRatePerPartition: 1000
Spark batch schedule time – 0.5 sec
What went well:
Lags already accumulated get released faster with higher records per batch (in millions). NETWORK_CONECTION issue resolved
Problem persist:
Once lag is cleared, and input traffic is consistent, still after a period of time, the processing time keeps increasing and lag starts to increase on Kafka. And at times of ban refresh where traffic increases, the lag keeps getting accumulated. When the batches were of high records, and in the initial stage, the processing time from smart is 2k-4k messages per sec. But over the period of time, when the incoming data flow is less 200-400 mps and batches of ~300 records, this processing time reduces to 100-200 messages per sec. This also leads to an increase in lags when we have high load, as the processing time is reduced. Not all executors are on high CPU usage, some at high speed and some on low CPU. Solution:
As more records/batches have better processing time, and overall processing time is greater for batch, increase the scheduling delay of batches from 0.5s to 3sec so the batches will get created of 600-700 records/batch. Reduced the executorsInstance to 40 so will see the effectiveness of CPU utilization and processing.
Stage 3:
Kafka Source topic: 24 brokers, 1200 partition
Kafka Target topic: 800 partition
executorsInstances: 40
executors cores: 10
Input rate: 200-250 messages/sec - 800-1000 messages/sec
BackPressure=false
maxRatePerPartition: 1000
Spark batch polling time – 3 sec
What went well:
Lags already accumulated get released faster with higher records per batch (in millions). With 3 sec polling time, less batches get created with more records / batch
Problem persist:
As we increase the polling time of batches, when the load is less and eventually processing time/batch duration reduces, some executors scale down due to Dynamic resource allocation enabled on Smart. Once again, if the input load increases, new executors do scale up, but not see they are on high CPU usage as with other executors. We also see heartbeat interval exception between driver and executors connection in some executors Some of the tasks in batches, takes 30 sec and more on one of the executors, and hence impacting the overall processing time, and hence delaying the next batch to execute. This lead to increase lag in Kafka
Reason for connection timeout:
Reason for the 30sec task in batch, we saw the logs and there were some Kafka connection timeout exceptions that happen on publish in these executors which delay the processing time of overall batch. Need to resolve the same.
Overall memory Utilization was not found causing any issue. CPU utilization does go high for executors.
Looking forward for the response.
Expected Batches processing time should not reduce over time when small/large batches are coming
本文标签: Sparkstream Stream Batches processing time reduce over time causing Kafka LagStack Overflow
版权声明:本文标题:[Spark-stream]: Stream Batches processing time reduce over time causing Kafka Lag - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741525451a2383437.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论