admin管理员组

文章数量:1122832

I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:

  1. Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return inputStream -> inputStream
        .processValues(() -> new HeartbeatProcessor())  // sends heartbeat events every second
        .groupByKey()
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(
            Duration.ofSeconds(5),  // inactivityGap
            Duration.ofSeconds(1)   // gracePeriod
        ))
        .aggregate(...)
        .suppress(Suppressed.untilWindowCloses(
            Suppressed.BufferConfig.unbounded()
        ));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
   at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)

Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.

  1. Even after fixing the NPE by setting a topic name in ProcessorRecordContext, the suppressed session windows don't close properly:
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
    private FixedKeyProcessorContext<String, String> context;

    @Override
    public void init(FixedKeyProcessorContext<String, String> context) {
        this.context = context;
        context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
    }

    @Override
    public void process(FixedKeyRecord<String, String> record) {
        context.forward(record);
    }

    private void generateHeartbeat(long timestamp) {
        if (context instanceof InternalProcessorContext internalContext) {
            internalContext.setRecordContext(
                new ProcessorRecordContext(
                    timestamp,
                    0L,
                    context.taskId().partition(),
                    "dummy-topic",
                    new RecordHeaders()
                )
            );
        }
        
        Record<String, String> record = new Record<>(
            "heartbeat-" + timestamp,  // different key every second
            "heartbeat",
            timestamp
        );
        context.forward(InternalFixedKeyRecordFactory.create(record));
    }
}

I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.

Version: org.apache.kafka:kafka-streams:3.7.0

I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:

  1. Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return inputStream -> inputStream
        .processValues(() -> new HeartbeatProcessor())  // sends heartbeat events every second
        .groupByKey()
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(
            Duration.ofSeconds(5),  // inactivityGap
            Duration.ofSeconds(1)   // gracePeriod
        ))
        .aggregate(...)
        .suppress(Suppressed.untilWindowCloses(
            Suppressed.BufferConfig.unbounded()
        ));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
   at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)

Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.

  1. Even after fixing the NPE by setting a topic name in ProcessorRecordContext, the suppressed session windows don't close properly:
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
    private FixedKeyProcessorContext<String, String> context;

    @Override
    public void init(FixedKeyProcessorContext<String, String> context) {
        this.context = context;
        context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
    }

    @Override
    public void process(FixedKeyRecord<String, String> record) {
        context.forward(record);
    }

    private void generateHeartbeat(long timestamp) {
        if (context instanceof InternalProcessorContext internalContext) {
            internalContext.setRecordContext(
                new ProcessorRecordContext(
                    timestamp,
                    0L,
                    context.taskId().partition(),
                    "dummy-topic",
                    new RecordHeaders()
                )
            );
        }
        
        Record<String, String> record = new Record<>(
            "heartbeat-" + timestamp,  // different key every second
            "heartbeat",
            timestamp
        );
        context.forward(InternalFixedKeyRecordFactory.create(record));
    }
}

I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.

Version: org.apache.kafka:kafka-streams:3.7.0

Share Improve this question edited Nov 22, 2024 at 2:19 suno3 asked Nov 21, 2024 at 10:15 suno3suno3 1711 silver badge6 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

but windows aren't closing consistently.

Can you describe in more detail what you observe, and what you expect to get?

Btw: Given that you are using internals from Kafka Streams, there is no guarantee that this will actually work... You should strictly avoid using anything internal, as it might change at any point (syntactically, as well as semantically).

I believe that is also why you get a NPE to begin with -- using context.forward inside a Punctuator in a processValue is not officially supported: https://issues.apache.org/jira/browse/KAFKA-16585

本文标签: Kafka Streams NPE in ProcessorRecordContext and Suppress issues with processValues()Stack Overflow