admin管理员组

文章数量:1122853

Flink

什么是连接器

预定义的源和接收器

​ Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES

等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。

​ 简单的说:flink连接器就是将某些数据源加载数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。

​ 例如我们使用JDBC 连接器,即可快速的使用JDBC从数据库中加载数据源并支持数据通过JDBC 输出到我们的数据库。

使用JDBC连接器 快速输出到Mysql

必要依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.12.2</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>

编码实现

package com.leilei;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;/*** @author lei* @version 1.0* @date 2021/3/13 21:07* @desc flink jdbc 连接器*/
public class FlinkConnector1_JDBC {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);DataStreamSource<VehicleAlarm> source = env.addSource(new MySource());String sql = "insert into vehicle_alarm_202103 (`id`,`license_plate`,`plate_color`,`device_time`,`zone`) " +"values(?,?,?,?,?)";JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://xxxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false").withUsername("root").withPassword("root").build();/*** 使用JDBC连接器 将数据发送到Mysql*/source.addSink(JdbcSink.sink(sql, (ps, vehicle) -> {ps.setString(1, vehicle.getId());ps.setString(2, vehicle.getLicensePlate());ps.setString(3, vehicle.getPlateColor());ps.setLong(4, vehicle.getDeviceTime());ps.setString(5, vehicle.getZone());}, jdbcBuild));try {env.execute("jdbc-connector");} catch (Exception e) {e.printStackTrace();}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class VehicleAlarm {private String id;private String licensePlate;private String plateColor;private Long deviceTime;private String zone;}/*** 自定义数据源*/public static class MySource extends RichSourceFunction<VehicleAlarm> {@Overridepublic void run(SourceContext<VehicleAlarm> ctx) throws Exception {long id = System.currentTimeMillis()/1000;VehicleAlarm vehicleAlarm = new VehicleAlarm(String.valueOf(id), "川A" + id,"紫", System.currentTimeMillis(), "sc");ctx.collect(vehicleAlarm);Thread.sleep(10000);}@Overridepublic void cancel() {}}}

本文标签: Flink