admin管理员组文章数量:1125805
StreamTaskException: ClassNotFoundException for RandomBinaryGenerator
Problem Description
I encountered a StreamTaskException while trying to execute the Flink job using the following command:
./flink run flink-stream-app/target/flink-stream-app-1.0-SNAPSHOT.jar
Error Summary
The error states that the RandomBinaryGenerator class could not be loaded. Below is the relevant portion of the stack trace:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.example.RandomBinaryGenerator Caused by: java.lang.ClassNotFoundException: org.example.RandomBinaryGenerator
This exception occurred during job recovery as described by the FixedDelayRestartBackoffTimeStrategy. The class loader failed to resolve the RandomBinaryGenerator class. Verification
The JAR seems to contain the required class:
jar tvf target/flink-stream-app-1.0-SNAPSHOT.jar | grep RandomBinaryGenerator
1855 Thu Jan 09 14:48:48 CET 2025 org/example/RandomBinaryGenerator.class
Running a DataStream in a Table Environment with a Temporary View
Objectiveyour text
I am trying to run a DataStream in a Table Environment with a temporary view. My goal is to have the DataStream continuously running, so I can integrate it later using jdbc_fdw
. The current content of the DataStream is not relevant for this phase.
Code
Main Application Code
package org.example;
import org.apache.flink.apimon.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class App {
public static void main(String[] args) throws Exception {
String ipAddress = "127.0.0.1";
int port = 8083;
Configuration configuration = new Configuration();
// Setting up the configuration
configuration.setString("rest.address", ipAddress);
configuration.setInteger("rest.port", port);
configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
configuration.setInteger("taskmanager.memory.process.size", 1024);
configuration.setString("table.exec.resource.default-parallelism", "1");
configuration.setBoolean("table.dynamic-table-options.enabled", true);
configuration.setString("restart-strategy", "fixed-delay");
configuration.setString("restart-strategy.fixed-delay.attempts", "3");
configuration.setString("restart-strategy.fixed-delay.delay", "10s");
// MiniCluster configuration
MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(1)
.build();
MiniCluster miniCluster = new MiniCluster(miniClusterConfig);
miniCluster.start();
try {
// Creating StreamExecutionEnvironment and StreamTableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// Adding a source stream
DataStream<Integer> sourceStream = env.addSource(new RandomBinaryGenerator());
// Registering a catalog and using it
tEnv.executeSql("CREATE CATALOG myCatalog WITH ('type'='generic_in_memory')");
tEnv.useCatalog("myCatalog");
// Printing the stream to console
sourceStream.print().name("Console Sink");
// Converting DataStream to Table and creating a temporary view
Table sourceTable = tEnv.fromDataStream(sourceStream,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.build());
tEnv.createTemporaryView("stream_job", sourceTable);
// Executing the job and starting the Flink cluster
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobID jobId = miniCluster.submitJob(jobGraph).get().getJobID();
System.out.println("Flink Job started with JobID: " + jobId);
System.out.println("Flink Cluster running at: http://" + ipAddress + ":" + port);
System.out.println("Press Enter to stop...");
System.in.read();
} finally {
// Closing the cluster
miniCluster.close();
}
}
}
Random Binary Generator Source
package org.example;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.Serializable;
public class RandomBinaryGenerator implements SourceFunction<Integer>, Serializable {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (isRunning) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect((int) (Math.random() * 2));
}
Thread.sleep(1000); // Simulating data generation delay
}
}
@Override
public void cancel() {
isRunning = false;
}
}
本文标签: javaFlink encounters StreamTaskExeption because it cant resolve classStack Overflow
版权声明:本文标题:java - Flink encounters StreamTaskExeption because it cant resolve class - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736637273a1945911.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论