admin管理员组文章数量:1122853
Flink
什么是连接器
预定义的源和接收器
Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES
等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。
简单的说:flink连接器就是将某些数据源加载
与数据输出
做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。
例如我们使用JDBC 连接器,即可快速的使用JDBC从数据库中加载数据源并支持数据通过JDBC 输出到我们的数据库。
使用JDBC连接器 快速输出到Mysql
必要依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.12.2</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>
编码实现
package com.leilei;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;/*** @author lei* @version 1.0* @date 2021/3/13 21:07* @desc flink jdbc 连接器*/
public class FlinkConnector1_JDBC {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<VehicleAlarm> source = env.addSource(new MySource());String sql = "insert into vehicle_alarm_202103 (`id`,`license_plate`,`plate_color`,`device_time`,`zone`) " +"values(?,?,?,?,?)";JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://xxxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false").withUsername("root").withPassword("root").build();/*** 使用JDBC连接器 将数据发送到Mysql*/source.addSink(JdbcSink.sink(sql, (ps, vehicle) -> {ps.setString(1, vehicle.getId());ps.setString(2, vehicle.getLicensePlate());ps.setString(3, vehicle.getPlateColor());ps.setLong(4, vehicle.getDeviceTime());ps.setString(5, vehicle.getZone());}, jdbcBuild));try {env.execute("jdbc-connector");} catch (Exception e) {e.printStackTrace();}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class VehicleAlarm {private String id;private String licensePlate;private String plateColor;private Long deviceTime;private String zone;}/*** 自定义数据源*/public static class MySource extends RichSourceFunction<VehicleAlarm> {@Overridepublic void run(SourceContext<VehicleAlarm> ctx) throws Exception {long id = System.currentTimeMillis()/1000;VehicleAlarm vehicleAlarm = new VehicleAlarm(String.valueOf(id), "川A" + id,"紫", System.currentTimeMillis(), "sc");ctx.collect(vehicleAlarm);Thread.sleep(10000);}@Overridepublic void cancel() {}}}
本文标签: Flink
版权声明:本文标题:Flink 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1686568340a11394.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论