admin管理员组文章数量:1122832
I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:
- Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return inputStream -> inputStream
.processValues(() -> new HeartbeatProcessor()) // sends heartbeat events every second
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofSeconds(5), // inactivityGap
Duration.ofSeconds(1) // gracePeriod
))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)
Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.
- Even after fixing the NPE by setting a topic name in ProcessorRecordContext, the suppressed session windows don't close properly:
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
private FixedKeyProcessorContext<String, String> context;
@Override
public void init(FixedKeyProcessorContext<String, String> context) {
this.context = context;
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
}
@Override
public void process(FixedKeyRecord<String, String> record) {
context.forward(record);
}
private void generateHeartbeat(long timestamp) {
if (context instanceof InternalProcessorContext internalContext) {
internalContext.setRecordContext(
new ProcessorRecordContext(
timestamp,
0L,
context.taskId().partition(),
"dummy-topic",
new RecordHeaders()
)
);
}
Record<String, String> record = new Record<>(
"heartbeat-" + timestamp, // different key every second
"heartbeat",
timestamp
);
context.forward(InternalFixedKeyRecordFactory.create(record));
}
}
I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.
Version: org.apache.kafka:kafka-streams:3.7.0
I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:
- Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return inputStream -> inputStream
.processValues(() -> new HeartbeatProcessor()) // sends heartbeat events every second
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofSeconds(5), // inactivityGap
Duration.ofSeconds(1) // gracePeriod
))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)
Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.
- Even after fixing the NPE by setting a topic name in ProcessorRecordContext, the suppressed session windows don't close properly:
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
private FixedKeyProcessorContext<String, String> context;
@Override
public void init(FixedKeyProcessorContext<String, String> context) {
this.context = context;
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
}
@Override
public void process(FixedKeyRecord<String, String> record) {
context.forward(record);
}
private void generateHeartbeat(long timestamp) {
if (context instanceof InternalProcessorContext internalContext) {
internalContext.setRecordContext(
new ProcessorRecordContext(
timestamp,
0L,
context.taskId().partition(),
"dummy-topic",
new RecordHeaders()
)
);
}
Record<String, String> record = new Record<>(
"heartbeat-" + timestamp, // different key every second
"heartbeat",
timestamp
);
context.forward(InternalFixedKeyRecordFactory.create(record));
}
}
I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.
Version: org.apache.kafka:kafka-streams:3.7.0
Share Improve this question edited Nov 22, 2024 at 2:19 suno3 asked Nov 21, 2024 at 10:15 suno3suno3 1711 silver badge6 bronze badges1 Answer
Reset to default 0but windows aren't closing consistently.
Can you describe in more detail what you observe, and what you expect to get?
Btw: Given that you are using internals from Kafka Streams, there is no guarantee that this will actually work... You should strictly avoid using anything internal, as it might change at any point (syntactically, as well as semantically).
I believe that is also why you get a NPE to begin with -- using context.forward
inside a Punctuator
in a processValue
is not officially supported: https://issues.apache.org/jira/browse/KAFKA-16585
本文标签: Kafka Streams NPE in ProcessorRecordContext and Suppress issues with processValues()Stack Overflow
版权声明:本文标题:Kafka Streams: NPE in ProcessorRecordContext and Suppress issues with processValues() - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736311910a1934908.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论