admin管理员组

文章数量:1128667

I am using the Confluent Schema Registry docker image confluentinc/cp-schema-registry:7.7.1

I have configured the compatibility to FULL_TRANSITIVE on subject clients-value.

I define a version V1 for my project event-entities (it contains AVRO schemas and so the AVRO-generated classes), I register schemas onto the registry. I plug V1 into a Publisher app and I can publish messages to Kafka.

My publisher uses io.confluent.kafka.serializers.KafkaAvroSerializer with AUTO_REGISTER_SCHEMAS at false, USE_LATEST_VERSION at true.

I then add a new optional field to a schema using {"name": "pocTitle", "type": ["string", "null"], "default": null}. I build a V2 version for my project event-entities, I register the new schemas with the Confluent Schema Registry. I stop/start my Publisher app which is still at V1 for event-entities.

I am expecting to still be able to publish as I am adding an optional field. But, instead, I am getting the stacktrace below:

org.apache.kafkamon.errors.SerializationException: Error serializing Avro message
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:185) ~[kafka-avro-serializer-7.7.2.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:68) ~[kafka-avro-serializer-7.7.2.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:57) ~[kafka-avro-serializer-7.7.2.jar:na]
    at com.mwiahmon.events.avro.serializers.CustomKafkaAvroSerializer.serialize(CustomKafkaAvroSerializer.java:43) ~[event-entities-1.0.54-AVRO-00-SNAPSHOT.jar:na]
    at com.mwiahmon.events.avro.serializers.CustomKafkaAvroSerializer.serialize(CustomKafkaAvroSerializer.java:19) ~[event-entities-1.0.54-AVRO-00-SNAPSHOT.jar:na]
    at org.apache.kafkamon.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.8.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1044) ~[kafka-clients-3.8.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:991) ~[kafka-clients-3.8.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:873) ~[kafka-clients-3.8.0.jar:na]
    at com.example.demo.eventing.kafka.KafkaEventSender.send(KafkaEventSender.java:93) ~[classes/:na]
    at com.example.demo.controller.TriggerController.publishToKafkaClients(TriggerController.java:37) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:255) ~[spring-web-6.1.14.jar:6.1.14]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:188) ~[spring-web-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:926) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:831) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:903) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:564) ~[tomcat-embed-core-10.1.31.jar:6.0]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.1.14.jar:6.1.14]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.31.jar:6.0]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:195) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.1.14.jar:6.1.14]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.14.jar:6.1.14]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.1.14.jar:6.1.14]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.14.jar:6.1.14]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.1.14.jar:6.1.14]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.14.jar:6.1.14]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:483) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:344) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:384) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:905) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.tomcat.util.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.tomcat.util.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1190) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:63) ~[tomcat-embed-core-10.1.31.jar:10.1.31]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.lang.NullPointerException: null value for (non-nullable) string at ClientEvent.payload.lastName
    at org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88) ~[avro-1.12.0.jar:1.12.0]
    at org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30) ~[avro-1.12.0.jar:1.12.0]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84) ~[avro-1.12.0.jar:1.12.0]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:203) ~[kafka-avro-serializer-7.7.2.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:173) ~[kafka-avro-serializer-7.7.2.jar:na]
    ... 60 common frames omitted
Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "datum" is null
    at org.apache.avro.specific.SpecificDatumWriter.writeString(SpecificDatumWriter.java:73) ~[avro-1.12.0.jar:1.12.0]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:165) ~[avro-1.12.0.jar:1.12.0]

The object to be serialised is: it does not have a field pocTitle

{"metadata": {"eventId": ValidatedString[value=ebfa9942-2901-4e49-997c-683c9ea01730], "creationTimeEpoch": 1736335441070, "creatorService": "MERLIN", "contactUUID": null, "tenantId": ValidatedString[value=someTenantId]}, "eventType": "CLIENT_CREATED", 
"payload": {"contactUUID": "29dd3855-91f8-466b-a0f8-8f39f3f084d6", "accountUUID": "29dd3855-91f8-466b-a0f8-8f39f3f084d6", "billingUUID": null, 
"firstName": null, "middleName": null, "title": null, "titleUUID": null, 
"lastName": "Blogg",
 "nickName": null, "addresses": [{"numberName": null, "address1": "add1", "address2": "add2", "city": "city", "administrativeArea": null, "postalCode": null, "addressUUID": null, "mailingAddress": null, "addressType": "HOME", "addressTypeUUID": "29dd3855-91f8-466b-a0f8-8f39f3f084d6"}], "phones": [], "emailAddresses": [], "createdTimeEpoch": 1736335441070, "modifiedTimeEpoch": 1736335441070, "isDeactivated": false, "deactivationDateEpoch": null, "dateOfBirth": null, "noMobileSelected": null, "noMobilePrivateSelected": null, "noEmailSelected": null, "noEmailPrivateSelected": null, "applySellAtCost": null, "onlineRegistered": null, "registerTimeEpoch": null, "registrationSiteUUID": null, "registrationSite": null, "popupNote": null, "introducedByUUID": null, "introducedBy": null, "treatmentDiscount": null, "discountCategoryUUID": null, "discountCategory": null, "accountCategories": [], "clientCategoryUUID": "29dd3855-91f8-466b-a0f8-8f39f3f084d6", "clientCategory": "aCat"}}

With schema:

{"type":"record","name":
"ClientEvent","namespace":"com.mwiahmon.events.avro.contacts.pms","fields":[{"name":"metadata","type":"com.mwiahmon.events.avromon.Metadata"},{"name":"eventType","type":{"type":"enum","name":"EventType","symbols":["CLIENT_CREATED","CLIENT_UPDATED"]}},
{"name":"payload","type":"ClientPayload"}]}

Where ClientPayload is (it has the optional field pocTitle)

{"type":"record","name":"ClientPayload","namespace":"com.mwiahmon.events.avro.contacts.pms","fields":
[{"name":"contactUUID","type":{"type":"string","logicalType":"uuid"}},{"name":"accountUUID","type":{"type":"string","logicalType":"uuid"}},
{"name":"billingUUID","type":[{"type":"string","logicalType":"uuid"},"null"],"default":null},
{"name":"firstName","type":["string","null"],"default":null},
{"name":"middleName","type":["string","null"],"default":null},
{"name":"title","type":["string","null"],"default":null},
{"name":"pocTitle","type":["string","null"],"default":null},
{"name":"titleUUID","type":[{"type":"string","logicalType":"uuid"},"null"],"default":null},
{"name":"lastName","type":"string"},{"name":"nickName","type":["string","null"],"default":null},{"name":"addresses","type":[{"type":"array","items":"Address"},"null"],"default":null},{"name":"phones","type":[{"type":"array","items":"Phone"},"null"],"default":null},{"name":"emailAddresses","type":[{"type":"array","items":"EmailAddress"}

I have tried to add the following to my Publisher but this did not help:

properties.put(AbstractKafkaSchemaSerDeConfig.ID_COMPATIBILITY_STRICT, Boolean.FALSE);
        properties.put(AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT, Boolean.FALSE);
        properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REFLECTION_CONFIG, Boolean.TRUE);

        properties.put(KafkaAvroSerializerConfig.AVRO_REFLECTION_ALLOW_NULL_CONFIG, Boolean.TRUE);
        properties.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, Boolean.TRUE);
        properties.put(KafkaAvroSerializerConfig.AVRO_REMOVE_JAVA_PROPS_CONFIG, Boolean.TRUE);

If I update my Publisher app to the V2 for event-entities, it publishes OK but from reading the documentation on compatibility levels, it seems to me that I should be able to register the new schemas, and working with FULL_TRANSITIVE, adding an optional field should not require an update of event-entities.

Thanks in advance for letting me know what I am missing.

本文标签: