admin管理员组

文章数量:1125805

StreamTaskException: ClassNotFoundException for RandomBinaryGenerator

Problem Description

I encountered a StreamTaskException while trying to execute the Flink job using the following command:


./flink run flink-stream-app/target/flink-stream-app-1.0-SNAPSHOT.jar

Error Summary

The error states that the RandomBinaryGenerator class could not be loaded. Below is the relevant portion of the stack trace:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.example.RandomBinaryGenerator Caused by: java.lang.ClassNotFoundException: org.example.RandomBinaryGenerator

This exception occurred during job recovery as described by the FixedDelayRestartBackoffTimeStrategy. The class loader failed to resolve the RandomBinaryGenerator class. Verification

The JAR seems to contain the required class:

jar tvf target/flink-stream-app-1.0-SNAPSHOT.jar | grep RandomBinaryGenerator
  1855 Thu Jan 09 14:48:48 CET 2025 org/example/RandomBinaryGenerator.class

Running a DataStream in a Table Environment with a Temporary View

Objectiveyour text

I am trying to run a DataStream in a Table Environment with a temporary view. My goal is to have the DataStream continuously running, so I can integrate it later using jdbc_fdw. The current content of the DataStream is not relevant for this phase.

Code

Main Application Code

package org.example;

import org.apache.flink.apimon.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class App {
    public static void main(String[] args) throws Exception {
        String ipAddress = "127.0.0.1";
        int port = 8083;
        Configuration configuration = new Configuration();

        // Setting up the configuration
        configuration.setString("rest.address", ipAddress);
        configuration.setInteger("rest.port", port);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
        configuration.setInteger("taskmanager.memory.process.size", 1024);
        configuration.setString("table.exec.resource.default-parallelism", "1");
        configuration.setBoolean("table.dynamic-table-options.enabled", true);
        configuration.setString("restart-strategy", "fixed-delay");
        configuration.setString("restart-strategy.fixed-delay.attempts", "3");
        configuration.setString("restart-strategy.fixed-delay.delay", "10s");

        // MiniCluster configuration
        MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
                .setConfiguration(configuration)
                .setNumTaskManagers(1)
                .setNumSlotsPerTaskManager(1)
                .build();

        MiniCluster miniCluster = new MiniCluster(miniClusterConfig);
        miniCluster.start();

        try {
            // Creating StreamExecutionEnvironment and StreamTableEnvironment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
            env.setParallelism(1);

            // Adding a source stream
            DataStream<Integer> sourceStream = env.addSource(new RandomBinaryGenerator());

            // Registering a catalog and using it
            tEnv.executeSql("CREATE CATALOG myCatalog WITH ('type'='generic_in_memory')");
            tEnv.useCatalog("myCatalog");

            // Printing the stream to console
            sourceStream.print().name("Console Sink");

            // Converting DataStream to Table and creating a temporary view
            Table sourceTable = tEnv.fromDataStream(sourceStream,
                    Schema.newBuilder()
                            .column("f0", DataTypes.INT())
                            .build());
            tEnv.createTemporaryView("stream_job", sourceTable);

            // Executing the job and starting the Flink cluster
            JobGraph jobGraph = env.getStreamGraph().getJobGraph();
            JobID jobId = miniCluster.submitJob(jobGraph).get().getJobID();

            System.out.println("Flink Job started with JobID: " + jobId);
            System.out.println("Flink Cluster running at: http://" + ipAddress + ":" + port);
            System.out.println("Press Enter to stop...");
            System.in.read();

        } finally {
            // Closing the cluster
            miniCluster.close();
        }
    }
}

Random Binary Generator Source

package org.example;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.Serializable;

public class RandomBinaryGenerator implements SourceFunction<Integer>, Serializable {
    private static final long serialVersionUID = 1L;
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (isRunning) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect((int) (Math.random() * 2));
            }
            Thread.sleep(1000); // Simulating data generation delay
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

本文标签: javaFlink encounters StreamTaskExeption because it cant resolve classStack Overflow