admin管理员组文章数量:1122832
Hey I am stuck with the same problem here with getting flink to read from kafka with avro and schema registry. I am able to see logs on schema-registry with flink trying to read from the server but I am also getting the same error on the Flink UI submit Job tab:
Server Response Message:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuturepleteThrowable(Unknown Source)
... 2 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
at com.example.DataStreamJob.main(DataStreamJob.java:50)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
... 2 more
Here's the JobManager Logs:
2025-01-05 21:13:46,934 INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Registering task executor 10.244.0.5:6122-71d77a under 23ae9ba7acbe7a3d9890b9c258c3098c at the slot manager.
2025-01-05 21:20:21,085 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2025-01-05 21:20:21,484 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2025-01-05 21:20:21,798 WARN org.apache.flink.configuration.Configuration [] - Config uses deprecated configuration key 'state.checkpoints.dir' instead of proper key 'execution.checkpointing.dir'
2025-01-05 21:20:21,810 WARN org.apache.flink.configuration.Configuration [] - Config uses deprecated configuration key 'state.backend' instead of proper key 'state.backend.type'
2025-01-05 21:20:23,542 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
Log Snippets of Schema-Registry:
[2025-01-06 02:18:40,157] INFO 192.168.31.82 - - [05/Jan/2025:20:48:40 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 95 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:30,953] INFO 192.168.31.82 - - [05/Jan/2025:20:53:30 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 175 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:35,526] INFO 192.168.31.82 - - [05/Jan/2025:20:53:35 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 19 (io.confluent.rest-utils.requests:62)
what can I do here's my pom.xml(I am using flink 1.20.0 version):
<project xmlns=".0.0" xmlns:xsi=";
xsi:schemaLocation=".0.0 .0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-kafka-avro</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<mavenpiler.source>${target.java.version}</mavenpiler.source>
<mavenpiler.target>${target.java.version}</mavenpiler.target>
<log4j.version>2.17.1</log4j.version>
<kafka.version>3.9.0</kafka.version>
<confluent.version>7.8.0</confluent.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>/</url>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.0-1.17</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.confluent</groupId>-->
<!-- <artifactId>kafka-schema-serializer</artifactId>-->
<!-- <version>7.8.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin to create a fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- Lifecycle Mapping Plugin for Eclipse -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
My DataStreamJob.java:
package com.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.apimon.eventtime.WatermarkStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
//import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Load properties
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get("/opt/flink/consumer.properties")));
String bootstrapServers = properties.getProperty("bootstrap.servers");
String schemaRegistryUrl = properties.getProperty("schema.registry.url");
String topicName = properties.getProperty("topic.name");
String groupId = properties.getProperty("group.id");
// Initialize Schema Registry Client
CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 10);
Schema avroSchema = null;
try {
String subject = topicName + "-value"; // Adjust subject naming as per your setup
avroSchema = new Schema.Parser().parse(schemaRegistryClient.getLatestSchemaMetadata(subject).getSchema());
} catch (Exception e) {
throw new RuntimeException("Failed to fetch schema from Schema Registry. Ensure the subject name and registry URL are correct.", e);
}
// Set up the Kafka source
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topicName)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(AvroDeserializationSchema.forGeneric(avroSchema))
.build();
// Add the source to the environment
DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process the stream (example transformation)
stream.map(GenericRecord::toString).print();
// Execute the Flink job
env.execute("Flink Kafka Avro Consumer");
}
}
I am using flink version 1.20.0 and I have setup my pom.xml with all the updated dependencies up to date with all the kafka schema-registry connectivity but still getting this error. what could be the possible reason for that.
Please help.
Hey I am stuck with the same problem here with getting flink to read from kafka with avro and schema registry. I am able to see logs on schema-registry with flink trying to read from the server but I am also getting the same error on the Flink UI submit Job tab:
Server Response Message:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 2 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
at com.example.DataStreamJob.main(DataStreamJob.java:50)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
... 2 more
Here's the JobManager Logs:
2025-01-05 21:13:46,934 INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Registering task executor 10.244.0.5:6122-71d77a under 23ae9ba7acbe7a3d9890b9c258c3098c at the slot manager.
2025-01-05 21:20:21,085 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2025-01-05 21:20:21,484 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2025-01-05 21:20:21,798 WARN org.apache.flink.configuration.Configuration [] - Config uses deprecated configuration key 'state.checkpoints.dir' instead of proper key 'execution.checkpointing.dir'
2025-01-05 21:20:21,810 WARN org.apache.flink.configuration.Configuration [] - Config uses deprecated configuration key 'state.backend' instead of proper key 'state.backend.type'
2025-01-05 21:20:23,542 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
Log Snippets of Schema-Registry:
[2025-01-06 02:18:40,157] INFO 192.168.31.82 - - [05/Jan/2025:20:48:40 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 95 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:30,953] INFO 192.168.31.82 - - [05/Jan/2025:20:53:30 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 175 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:35,526] INFO 192.168.31.82 - - [05/Jan/2025:20:53:35 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 19 (io.confluent.rest-utils.requests:62)
what can I do here's my pom.xml(I am using flink 1.20.0 version):
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-kafka-avro</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
<kafka.version>3.9.0</kafka.version>
<confluent.version>7.8.0</confluent.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.0-1.17</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.confluent</groupId>-->
<!-- <artifactId>kafka-schema-serializer</artifactId>-->
<!-- <version>7.8.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin to create a fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- Lifecycle Mapping Plugin for Eclipse -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
My DataStreamJob.java:
package com.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
//import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Load properties
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get("/opt/flink/consumer.properties")));
String bootstrapServers = properties.getProperty("bootstrap.servers");
String schemaRegistryUrl = properties.getProperty("schema.registry.url");
String topicName = properties.getProperty("topic.name");
String groupId = properties.getProperty("group.id");
// Initialize Schema Registry Client
CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 10);
Schema avroSchema = null;
try {
String subject = topicName + "-value"; // Adjust subject naming as per your setup
avroSchema = new Schema.Parser().parse(schemaRegistryClient.getLatestSchemaMetadata(subject).getSchema());
} catch (Exception e) {
throw new RuntimeException("Failed to fetch schema from Schema Registry. Ensure the subject name and registry URL are correct.", e);
}
// Set up the Kafka source
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topicName)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(AvroDeserializationSchema.forGeneric(avroSchema))
.build();
// Add the source to the environment
DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process the stream (example transformation)
stream.map(GenericRecord::toString).print();
// Execute the Flink job
env.execute("Flink Kafka Avro Consumer");
}
}
I am using flink version 1.20.0 and I have setup my pom.xml with all the updated dependencies up to date with all the kafka schema-registry connectivity but still getting this error. what could be the possible reason for that.
Please help.
Share Improve this question asked yesterday Rajat SinhaRajat Sinha 11 Answer
Reset to default 0java.lang.NoSuchMethodError
is usually caused by version mismatches.
flink-connector-kafka v3.3.0 was built against
<kafka.version>3.4.0</kafka.version>
<confluent.version>7.4.4</confluent.version>
I suspect the problem lies there (but I'm just guessing).
本文标签:
版权声明:本文标题:'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGene 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736282750a1926739.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论