admin管理员组文章数量:1353240
I have a custom JdbcDialect
, and I am trying to get it registered in my Spark (v3.5.1) cluster.
The dialect works perfectly fine in a local mode. Additionally, canHandle
method implemented to always return true
for now. This is how I excluded a DB URL and the dialect from the equation.
The Spark cluster also successfully retrieves data from DB whenever I execute queries that don't require dialect customizations. This is how I made sure the cluster is healthy.
I have prepared a minimal, reproducible example.
Dialect implementation:
import .apache.spark.sql.connector.expressions.Expression;
import .apache.spark.sql.jdbc.JdbcDialect;
import .apache.spark.sql.jdbc.MySQLDialect;
import scala.Option;
public class MemSQL5Dialect extends JdbcDialect {
private static class SQLBuilder extends MySQLDialect.MySQLSQLBuilder {
// My customizations
}
@Override
public Option<String> compileExpression(Expression expr) {
try {
return Option.apply(new SQLBuilder().build(expr));
} catch (Exception e) {
return Option.empty();
}
}
@Override
public boolean canHandle(String url) {
return true;
}
}
Application implementation:
import .apache.spark.sql.SparkSession;
import .apache.spark.sql.jdbc.JdbcDialects;
import java.Inet4Address;
import java.util.Properties;
import static .apache.spark.sql.functions.col;
public class Application {
public static void main(String[] args) throws Exception {
// Cluster is running using Docker, so I use IPv4.
var hostAddress = Inet4Address.getLocalHost().getHostAddress();
// JAR with dialect is copied to the folder inside Docker container, where Bitnami Spark
// keeps all other JARs. Double-checked JAR is there and JAR includes the dialect!
var applicationJarWithDialect = "/opt/bitnami/spark/jars/spark-playground-SNAPSHOT.jar";
var sparkSession = SparkSession.builder()
.appName("Spark Playground")
// When set to "local[*]" works perfectly fine!
.master("spark://localhost:7077")
.config("spark.driver.host", hostAddress)
// I tried also these 2 properties below - did not help.
// .config("spark.driver.extraClassPath", applicationJarWithDialect)
// .config("spark.executor.extraClassPath", applicationJarWithDialect)
.getOrCreate();
// Trying to register the dialect on driver.
JdbcDialects.registerDialect(new MemSQL5Dialect());
var jdbcUrl = String.format("jdbc:mariadb://%s:3306/db", hostAddress);
var jdbcUsername = "root";
var jdbcPassword = "root";
var tableName = "phonebook";
var properties = new Properties();
properties.put("user", jdbcUsername);
properties.put("password", jdbcPassword);
properties.put("driver", ".mariadb.jdbc.Driver");
try {
sparkSession.read().jdbc(jdbcUrl, tableName, properties)
// If I change filter to something that doesn't require
// dialect changes -- works fine as well!
.filter(col("first_name").startsWith("J"))
.collectAsList()
.forEach(System.out::println);
} finally {
sparkSession.close();
}
}
}
Please help to understand what I am missing.
I have a custom JdbcDialect
, and I am trying to get it registered in my Spark (v3.5.1) cluster.
The dialect works perfectly fine in a local mode. Additionally, canHandle
method implemented to always return true
for now. This is how I excluded a DB URL and the dialect from the equation.
The Spark cluster also successfully retrieves data from DB whenever I execute queries that don't require dialect customizations. This is how I made sure the cluster is healthy.
I have prepared a minimal, reproducible example.
Dialect implementation:
import .apache.spark.sql.connector.expressions.Expression;
import .apache.spark.sql.jdbc.JdbcDialect;
import .apache.spark.sql.jdbc.MySQLDialect;
import scala.Option;
public class MemSQL5Dialect extends JdbcDialect {
private static class SQLBuilder extends MySQLDialect.MySQLSQLBuilder {
// My customizations
}
@Override
public Option<String> compileExpression(Expression expr) {
try {
return Option.apply(new SQLBuilder().build(expr));
} catch (Exception e) {
return Option.empty();
}
}
@Override
public boolean canHandle(String url) {
return true;
}
}
Application implementation:
import .apache.spark.sql.SparkSession;
import .apache.spark.sql.jdbc.JdbcDialects;
import java.Inet4Address;
import java.util.Properties;
import static .apache.spark.sql.functions.col;
public class Application {
public static void main(String[] args) throws Exception {
// Cluster is running using Docker, so I use IPv4.
var hostAddress = Inet4Address.getLocalHost().getHostAddress();
// JAR with dialect is copied to the folder inside Docker container, where Bitnami Spark
// keeps all other JARs. Double-checked JAR is there and JAR includes the dialect!
var applicationJarWithDialect = "/opt/bitnami/spark/jars/spark-playground-SNAPSHOT.jar";
var sparkSession = SparkSession.builder()
.appName("Spark Playground")
// When set to "local[*]" works perfectly fine!
.master("spark://localhost:7077")
.config("spark.driver.host", hostAddress)
// I tried also these 2 properties below - did not help.
// .config("spark.driver.extraClassPath", applicationJarWithDialect)
// .config("spark.executor.extraClassPath", applicationJarWithDialect)
.getOrCreate();
// Trying to register the dialect on driver.
JdbcDialects.registerDialect(new MemSQL5Dialect());
var jdbcUrl = String.format("jdbc:mariadb://%s:3306/db", hostAddress);
var jdbcUsername = "root";
var jdbcPassword = "root";
var tableName = "phonebook";
var properties = new Properties();
properties.put("user", jdbcUsername);
properties.put("password", jdbcPassword);
properties.put("driver", ".mariadb.jdbc.Driver");
try {
sparkSession.read().jdbc(jdbcUrl, tableName, properties)
// If I change filter to something that doesn't require
// dialect changes -- works fine as well!
.filter(col("first_name").startsWith("J"))
.collectAsList()
.forEach(System.out::println);
} finally {
sparkSession.close();
}
}
}
Please help to understand what I am missing.
Share Improve this question asked Apr 1 at 20:15 Illia ChtchomaIllia Chtchoma 531 silver badge5 bronze badges1 Answer
Reset to default 0It turned out, there were only 2 things that really mattered:
The application JAR with the custom dialect had to be accessible for Spark workers. Same goes for DB driver. I put those into the folder where all other Spark dependency JARs were located.
(The key thing I missed) The dialect registration had to occur on every worker, so I replaced
JdbcDialects.registerDialect(new MemSQL5Dialect());
with
new JavaSparkContext(sparkSession.sparkContext()) .parallelize(List.of(1)) .foreachPartition((VoidFunction<Iterator<Integer>>) integerIterator -> { JdbcDialects.registerDialect(new MemSQL5Dialect()); });
本文标签: javaCustom Spark JdbcDialect is not used in cluster modeStack Overflow
版权声明:本文标题:java - Custom Spark JdbcDialect is not used in cluster mode - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743873767a2553953.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论