Flink1.14 StreamApi(一)常见的source和sink总结
创始人
2025-05-28 08:07:56
0

Flink1.14 StreamApi常见的source和sink

flink支持向文件、socket、集合等中读写数据,同时Flink也内置许多connectors,例如Kafka、Hadoop、Redis等。

一、常见的source

source 是flink用来获取外部数据的算子,按照获取数据的方式,可以分为:

  • 基于集合的 Source

  • 基于 Socket 网络端口的 Source

  • 基于文件的 Source

  • 第三方 Connector Source

  • 自定义 Source 五种

从并行度的角度,source 又可以分为非并行的 source 和并行的 source:

  • 非并行 source: 并行度只能为 1,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如 Socket Source;

  • 并行 Source: 并行度可以是 1到多个,在计算资源足够的前提下,并行度越大,效率越高。例如Kafka Source;

1.1 从集合得到数据流

package cn.yyds.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01_ElementsSource {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);env.setParallelism(1);  // 默认并行度/*** 从集合得到数据流*/DataStreamSource fromElements = env.fromElements(1, 2, 3, 4, 5);fromElements.map(d -> d * 10).print();env.execute();}
}

1.2 从 socket 端口获取数据得到数据流

package cn.yyds.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _02_SocketSource {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);env.setParallelism(1);  // 默认并行度/*** 从 socket 端口获取数据得到数据流* socketTextStream方法产生的source算子,是一个单并行度的source算子*/DataStreamSource socketSource = env.socketTextStream("centos01", 9999);socketSource.print();env.execute();}
}

1.3 从文件得到数据流

package cn.yyds.source;import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;public class _03_TextFileSource {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);env.setParallelism(1);  // 默认并行度/*** 从文件得到数据流*/DataStreamSource fileSource = env.readTextFile("files/data/wc.txt", "utf-8");fileSource.map(String::toUpperCase)/*.print()*/;// FileProcessingMode.PROCESS_ONCE  表示,对文件只读一次,计算一次,然后程序就退出// FileProcessingMode.PROCESS_CONTINUOUSLY 表示,会监视着文件的变化,一旦发现文件有变化,则会再次对整个文件进行重新计算DataStreamSource fileSource2 = env.readFile(new TextInputFormat(null), "files/data/wc.txt", FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);fileSource2.map(String::toUpperCase).print();env.execute();}
}

1.4 从kafka中读取数据得到数据流(生产用)

​ 在实际生产环境中,为了保证 flink 可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如 Apache Kafka。Kafka 的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink 和 Kaka 整合可以高效的读取数据,并且可以保证 Exactly Once(精确一次性语义)。


org.apache.flinkflink-connector-kafka_${scala.binary.version}${flink.version}

1.4.1 flink14之前老版本的api

/*** 创建一个工具类* 为了实现从kafka中读取数据,需要创建kafka的消费者的source源*/
public class FlinkUtils {//创建stream的执行环境,不能改变,因此设置为static finalpublic static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/**** @param parameterTool  传入的参数工具类* @param schema* @param   泛型* @return*/public static  DataStream createKafkaStream(ParameterTool parameterTool, Class< ? extends DeserializationSchema > schema) throws Exception {//从工具类中获取checkpoint的时间间隔,默认是30秒中long interval = parameterTool.getLong("checkpoint.interval", 30000L);env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);//为了避免运算的数据在程序cancle等时候会把数据丢失,需要设置这个参数env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//获取topicsString[] strings = parameterTool.get("kafka.input.topics").split(",");List topics = Arrays.asList(strings);Properties properties = parameterTool.getProperties();FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,schema.newInstance(),properties);//不把偏移量设置到特殊的topic中flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);// 从kafka最新的位置进行消费flinkKafkaConsumer.setStartFromLatest();//fink整合kafka的消费者DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);return dataStreamSource;}/***   需要重载一个创建kafka消费者的方法,因为这个方法中传入的KafkaDeserializationSchema deserializer类型中*     有一个  T deserialize(ConsumerRecord record)方法,能够拿到ConsumerRecord信息*     从而可以拿到topic partition offset方法**       public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema deserializer, Properties props)*       T deserialize(ConsumerRecord record) throws Exception;* @param parameterTool  配置参数的工具* @param schema* @param * @return* @throws Exception*/public static  DataStream createKafkaStreamWithId(ParameterTool parameterTool, Class< ? extends KafkaDeserializationSchema> schema) throws Exception {//设置checkpointing的时间间隔long interval = parameterTool.getLong("checkpoint.interval", 30000L);env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);//rocksDBStateBackend.setDbStoragePath("");env.setStateBackend(rocksDBStateBackend);//设置报错checkpoint的数据到hdfs中env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//创建kafka的消费者String[] split = parameterTool.get("kafka.input.topics").split(",");List topics = Arrays.asList(split);Properties properties = parameterTool.getProperties();FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topics,schema.newInstance(),properties);//不把偏移量设置到特殊的topic中kafkaConsumer.setCommitOffsetsOnCheckpoints(false);DataStreamSource dataStreamSource = env.addSource(kafkaConsumer);return dataStreamSource;}
}

1.4.2 flink14版本api

package cn.yyds.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;public class _04_kafkaSourceSource {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);env.setParallelism(1);  // 默认并行度/*** 引入扩展包 :  flink-connector-kafka* 从kafka中读取数据得到数据流*/KafkaSource kafkaSource = KafkaSource.builder()// 设置订阅的目标主题.setTopics("tp01")// 设置消费者组id.setGroupId("gp01")// 设置kafka服务器地址.setBootstrapServers("centos01:9092")// 起始消费位移的指定://    OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) 消费起始位移选择之前所提交的偏移量(如果没有,则重置为LATEST)//    OffsetsInitializer.earliest()  消费起始位移直接选择为 “最早”//    OffsetsInitializer.latest()  消费起始位移直接选择为 “最新”//    OffsetsInitializer.offsets(Map)  消费起始位移选择为:方法所传入的每个分区和对应的起始偏移量.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))// 设置value数据的反序列化器.setValueOnlyDeserializer(new SimpleStringSchema())// 开启kafka底层消费者的自动位移提交机制//    它会把最新的消费位移提交到kafka的consumer_offsets中//    就算把自动位移提交机制开启,KafkaSource依然不依赖自动位移提交机制//    (宕机重启时,优先从flink自己的状态中去获取偏移量<更可靠>).setProperty("auto.offset.commit", "true")// 把本source算子设置成  BOUNDED属性(有界流)//     将来本source去读取数据的时候,读到指定的位置,就停止读取并退出//     常用于补数或者重跑某一段历史数据// .setBounded(OffsetsInitializer.committedOffsets())// 把本source算子设置成  UNBOUNDED属性(无界流)//     但是并不会一直读数据,而是达到指定位置就停止读取,但程序不退出//     主要应用场景:需要从kafka中读取某一段固定长度的数据,然后拿着这段数据去跟另外一个真正的无界流联合处理//.setUnbounded(OffsetsInitializer.latest()).build();// env.addSource();  //  接收的是  SourceFunction接口的 实现类DataStreamSource streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");//  接收的是 Source 接口的实现类streamSource.print();env.execute();}
}

​ 新版本API中flink 会把kafka消费者的消费位移记录在算子状态中,这样就实现了消费位移状态的容错,从而可以支持端到端的exactly-once;

1.5 自定义Source

自定义 source

  • 可以实现SourceFunction 或者 RichsourceFunction,这两者都是非并行的 source 算子。

  • 也可实现ParallelSourceFunction 或者 RichParallelSourceFunction ,这两者都是可并行的。

source 算子

  • 带 Rich的,都拥有 open() ,close () ,getRuntimeContext() 方法;

  • 带 Parallel的,都可多实例并行执行。

package cn.yyds.source;import lombok.*;import java.util.Map;@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
public   class EventLog{private long guid;private String sessionId;private String eventId;private long timeStamp;private Map eventInfo;
}
package cn.yyds.source;import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _05_SourceFunctionSource {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);DataStreamSource dataStreamSource = env.addSource(new MySourceFunction());dataStreamSource.map(JSON::toJSONString).print();env.execute();}
}
package cn.yyds.source;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.HashMap;public class MySourceFunction implements SourceFunction {volatile boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {EventLog eventLog = new EventLog();String[] events = {"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};HashMap eventInfoMap = new HashMap<>();while(flag){eventLog.setGuid(RandomUtils.nextLong(1,1000));eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());eventLog.setTimeStamp(System.currentTimeMillis());eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);eventInfoMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2));eventLog.setEventInfo(eventInfoMap);ctx.collect(eventLog);eventInfoMap.clear();Thread.sleep(RandomUtils.nextInt(200,1500));}}@Overridepublic void cancel() {flag = false;}
}

可以看到,source只有一个并行度

在这里插入图片描述

package cn.yyds.source;import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.HashMap;public class _06_RichParallelSource {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);DataStreamSource dataStreamSource = env.addSource(new MyRichParallelSourceFunction());SingleOutputStreamOperator resStream = dataStreamSource.map(JSON::toJSONString).disableChaining();resStream.print();env.execute();}
}class MyRichParallelSourceFunction extends RichParallelSourceFunction {volatile boolean flag = true;/*** source组件初始化* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {RuntimeContext runtimeContext = getRuntimeContext();// 可以从运行时上下文中,取到本算子所属的 task 的task名String taskName = runtimeContext.getTaskName();// 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskIdint indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();}/*** source组件生成数据的过程(核心工作逻辑)* @param ctx* @throws Exception*/@Overridepublic void run(SourceContext ctx) throws Exception {EventLog eventLog = new EventLog();String[] events = {"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};HashMap eventInfoMap = new HashMap<>();while(flag){eventLog.setGuid(RandomUtils.nextLong(1,1000));eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());eventLog.setTimeStamp(System.currentTimeMillis());eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);eventInfoMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2));eventLog.setEventInfo(eventInfoMap);ctx.collect(eventLog);eventInfoMap.clear();Thread.sleep(RandomUtils.nextInt(500,1500));}}/*** job取消调用的方法*/@Overridepublic void cancel()    {flag = false;}/*** 组件关闭调用的方法* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("组件被关闭了.....");}
}

可以看到,source有12个并行度

在这里插入图片描述

二、常见的sink

​ sink 算子是将计算结果最终输出的算了不同的 sink 算子可以将数据输出到不同的目标,如写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台

2.1 输出到文件

2.1.1 writeAsText/writeAsCsv

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01_FileSinkOperator {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");env.setParallelism(2);DataStreamSource streamSource = env.addSource(new MySourceFunction());// 输出到文件streamSource.map(bean -> Tuple5.of(bean.getEventId(), bean.getGuid(), bean.getEventInfo(), bean.getSessionId(), bean.getTimeStamp())).returns(new TypeHint, String, Long>>() {})/*.writeAsCsv("d:/sink_test2", FileSystem.WriteMode.OVERWRITE)*/;streamSource.writeAsText("d:/flink/sink_test", FileSystem.WriteMode.OVERWRITE);env.execute();}
}

2.1.2 StreamFileSink

​ StreamFileSink不但可以将数据写入到各种文件系统中,而且整合了 checkpoint 机制来保证 Exacly Once 语义还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink 中输出的文件,其生命周期会经历 3种状态:

  • in-progress Files
  • Pending Files
  • Finished Files
        org.apache.flinkflink-parquet_2.12${flink.version}org.apache.flinkflink-avro${flink.version}org.apache.parquetparquet-avro1.11.1org.apache.hadoophadoop-client2.7.7org.apache.flinkflink-connector-files${flink.version}

2.1.2.0 输出为行格式

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;public class _02_StreamSinkRow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());/*** 应用  StreamFileSink 算子,来将数据输出到  文件系统** 输出为 行格式*/// 构造一个FileSink对象FileSink rowSink = FileSink.forRowFormat(new Path("d:/flink/filesink/"), new SimpleStringEncoder("utf-8"))// 文件的滚动策略 (间隔时长10s,或文件大小达到 5M,就进行文件切换.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(10000).withMaxPartSize(5 * 1024 * 1024).build())// 分桶的策略(划分子文件夹的策略).withBucketAssigner(new DateTimeBucketAssigner()).withBucketCheckInterval(5)// 输出文件的文件名相关配置.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".txt").build()).build();// 然后添加到流,进行输出streamSource.map(JSON::toJSONString)//.addSink()  /* SinkFunction实现类对象,用addSink() 来添加*/.sinkTo(rowSink); /*Sink 的实现类对象,用 sinkTo()来添加  */env.execute();}
}

2.1.2.1 利用schema构造一个parquetWriterFactory(输出为列格式parquet)

package cn.yyds.sink;import org.apache.avro.Schema;
import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;public class _02_StreamSinkDemo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());/*** 方式一:* 核心逻辑:*   - 构造一个schema*   - 利用schema构造一个parquetWriterFactory*   - 利用parquetWriterFactory构造一个FileSink算子*   - 将原始数据转成GenericRecord流,输出到FileSink算子*/// 1. 先定义GenericRecord的数据模式Schema schema = SchemaBuilder.builder().record("DataRecord").namespace("cn.yyds.sink.avro.schema").doc("用户行为事件数据模式").fields().requiredInt("gid").requiredLong("ts").requiredString("eventId").requiredString("sessionId").name("eventInfo").type().map().values().type("string").noDefault().endRecord();// 2. 通过定义好的schema模式,来得到一个parquetWriterParquetWriterFactory writerFactory = ParquetAvroWriters.forGenericRecord(schema);// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子FileSink sink1 = FileSink.forBulkFormat(new Path("d:/flink/datasink/"), writerFactory).withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".parquet").build()).build();// 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的  GenericRecord流SingleOutputStreamOperator recordStream = streamSource.map((MapFunction) eventLog -> {// 构造一个Record对象GenericData.Record record = new GenericData.Record(schema);// 将数据填入recordrecord.put("gid", (int) eventLog.getGuid());record.put("eventId", eventLog.getEventId());record.put("ts", eventLog.getTimeStamp());record.put("sessionId", eventLog.getSessionId());record.put("eventInfo", eventLog.getEventInfo());return record;}).returns(new GenericRecordAvroTypeInfo(schema));  // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer// 5. 输出数据recordStream.sinkTo(sink1);env.execute();}
}

2.1.2.2 利用avsc文本文件构造一个parquetWriterFactory(比较麻烦)(输出为列格式parquet)

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class _02_StreamSinkDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());/*** 方式二:* 核心逻辑:*   - 编写一个avsc文本文件(json),来描述数据模式*   - 添加 maven代码生成器插件,来针对上述的avsc生成avro特定格式的JavaBean类*   - 利用代码生成器生成的 JavaBean,来构造一个 parquetWriterFactory*   - 利用parquetWriterFactory构造一个FileSink算子*   - 将原始数据流 转成 特定格式JavaBean流,输出到 FileSink算子*/// 1. 先定义avsc文件放在resources文件夹中,并用maven的插件,来编译一下,生成特定格式的JavaBean : AvroEventLog// 这种根据avsc生成的JavaBean类,自身就已经带有了Schema对象// AvroEventLog avroEventLog = new AvroEventLog();// Schema schema = avroEventLog.getSchema();// 2. 通过自动生成 AvroEventLog类,来得到一个parquetWriterParquetWriterFactory parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(AvroEventLog.class);// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子FileSink bulkSink = FileSink.forBulkFormat(new Path("d:/flink/datasink2/"), parquetWriterFactory).withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".parquet").build()).build();// 4. 将自定义javabean的 EventLog 流,转成 上述sink算子中parquetWriter所需要的  AvroEventLog 流SingleOutputStreamOperator avroEventLogStream = streamSource.map(new MapFunction() {@Overridepublic AvroEventLog map(EventLog eventLog) throws Exception {HashMap eventInfo1 = new HashMap<>();// 进行hashmap类型的数据转移Map eventInfo2 = eventLog.getEventInfo();Set> entries = eventInfo2.entrySet();for (Map.Entry entry : entries) {eventInfo1.put(entry.getKey(), entry.getValue());}return new AvroEventLog(eventLog.getGuid(), eventLog.getSessionId(), eventLog.getEventId(), eventLog.getTimeStamp(), eventInfo1);}});// 5. 输出数据avroEventLogStream.sinkTo(bulkSink);env.execute();}
}

avsc文件如下

{"namespace": "cn.yyds.flink.avro.schema","type": "record","name": "AvroEventLog","fields": [{"name": "guid", "type": "long"},{"name": "sessionId",  "type": "string"},{"name": "eventId",  "type": "string"},{"name": "timeStamp",  "type": "long"},{"name": "eventInfo", "type": { "type":"map","values": "string"} }]
}

2.1.2.3 利用自己的JavaBean类构造一个parquetWriterFactory(输出为列格式parquet)

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;public class _02_StreamSinkDemo3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());/*** 方式三:* 核心逻辑:*   - 利用自己的JavaBean类,来构造一个 parquetWriterFactory*   - 利用parquetWriterFactory构造一个FileSink算子*   - 将原始数据流,输出到 FileSink算子*/// 2. 通过自己的JavaBean类,来得到一个parquetWriterParquetWriterFactory parquetWriterFactory = ParquetAvroWriters.forReflectRecord(EventLog.class);// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子FileSink bulkSink = FileSink.forBulkFormat(new Path("d:/flink/datasink3/"), parquetWriterFactory).withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".parquet").build()).build();// 5. 输出数据streamSource.sinkTo(bulkSink);env.execute();}
}

2.2 输出到kafka

2.2.1 老版本api(flink1.14 标记为过时)

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;
import java.util.Properties;public class _03_KafkaSinkOld {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());SingleOutputStreamOperator mapStream = streamSource.map(JSON::toJSONString);// 写入kafka的topicString topic = "test";// 设置kafka的相关参数Properties prop = new Properties();prop.setProperty("bootstrap.servers","centos01:9092,centos02:9092,centos03:9092");// 创建kafka生产者FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(topic, // 指定topicnew KafkaSerializationSchema() {@Overridepublic ProducerRecord serialize(String element, @Nullable Long aLong) {return new ProducerRecord(topic,element.getBytes());}}, // 指定写入的kafka序列化schemaprop, // 指定kafka相关参数FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 指定精准一次性语义);// 添加KafkaSinkmapStream.addSink(kafkaProducer);env.execute();}
}

2.2.2 新版本api

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _04_KafkaSinkNew {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port",8822);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());// 把数据写入kafka// 1. 构造一个kafka的sink算子KafkaSink kafkaSink = KafkaSink.builder().setBootstrapServers("centos01:9092,centos02:9092,centos03:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("event-log").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix("yyds-").build();// 2. 把数据流输出到构造好的sink算子streamSource.map(JSON::toJSONString).disableChaining().sinkTo(kafkaSink);env.execute();}
}

KafkaSink 是能结合 Flink 的 Checkpoint 机制,来支持端到端精确一次语义的 。底层是利用了 kafka producer 的事务机制。

2.3 JdbcSink

2.3.1 不保证 EOS语义的方式

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;public class _05_JdbcSinkOperatorNoEOS {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());/***  一、 不保证 EOS语义的方式*/SinkFunction jdbcSink = JdbcSink.sink("insert into event_log values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ",new JdbcStatementBuilder() {@Overridepublic void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {preparedStatement.setLong(1, eventLog.getGuid());preparedStatement.setString(2, eventLog.getSessionId());preparedStatement.setString(3, eventLog.getEventId());preparedStatement.setLong(4, eventLog.getTimeStamp());preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));preparedStatement.setLong(6, eventLog.getGuid());preparedStatement.setString(7, eventLog.getSessionId());preparedStatement.setString(8, eventLog.getEventId());preparedStatement.setLong(9, eventLog.getTimeStamp());preparedStatement.setString(10, JSON.toJSONString(eventLog.getEventInfo()));}},JdbcExecutionOptions.builder().withMaxRetries(3).withBatchSize(1).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("root").build());// 输出数据streamSource.addSink(jdbcSink);env.execute();}
}

2.3.2 提供 EOS 语义保证的 sink

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class _05_JdbcSinkOperatorEOS {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());/*** 二、可以提供 EOS 语义保证的 sink*/SinkFunction exactlyOnceSink = JdbcSink.exactlyOnceSink("insert into event_log values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ",new JdbcStatementBuilder() {@Overridepublic void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {preparedStatement.setLong(1, eventLog.getGuid());preparedStatement.setString(2, eventLog.getSessionId());preparedStatement.setString(3, eventLog.getEventId());preparedStatement.setLong(4, eventLog.getTimeStamp());preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));preparedStatement.setLong(6, eventLog.getGuid());preparedStatement.setString(7, eventLog.getSessionId());preparedStatement.setString(8, eventLog.getEventId());preparedStatement.setLong(9, eventLog.getTimeStamp());preparedStatement.setString(10, JSON.toJSONString(eventLog.getEventInfo()));}},JdbcExecutionOptions.builder().withMaxRetries(3).withBatchSize(1).build(),JdbcExactlyOnceOptions.builder()// mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true.withTransactionPerConnection(true).build(),new SerializableSupplier() {@Overridepublic XADataSource get() {// XADataSource就是jdbc连接,不过它是支持分布式事务的连接// 而且它的构造方法,不同的数据库构造方法不同MysqlXADataSource xaDataSource = new MysqlXADataSource();xaDataSource.setUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8");xaDataSource.setUser("root");xaDataSource.setPassword("root");return xaDataSource;}});// 输出数据streamSource.addSink(exactlyOnceSink);env.execute();}
}

2.4 Redis Sink


org.apache.bahirflink-connector-redis_${scala.binary.version}1.1-SNAPSHOT

package cn.yyds.sink;import cn.yyds.source.EventLog;
import cn.yyds.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class _06_RedisSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt");// 构造好一个数据流DataStreamSource streamSource = env.addSource(new MySourceFunction());// eventLog数据插入redisFlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("centos01").build();RedisSink redisSink = new RedisSink<>(config, new StringInsertMapper());streamSource.addSink(redisSink);env.execute();}static class StringInsertMapper implements RedisMapper {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}/***  如果选择的是没有内部key的redis数据结构,则此方法返回的就是大 key*  如果选择的是有内部key的redis数据结构(hset),则此方法返回的是hset内部的小key,二把上面Description中传入的值作为大key* @param data* @return*/@Overridepublic String getKeyFromData(EventLog data) {return data.getGuid()+"-"+data.getSessionId()+"-"+data.getTimeStamp();   // 这里就是string数据的大key}@Overridepublic String getValueFromData(EventLog data) {return JSON.toJSONString(data);   // 这里就是string数据的value}}}

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
JAVA多线程知识整理 Java多线程基础 线程的创建和启动 继承Thread类来创建并启动 自定义Thread类的子类&#...
【洛谷 P1090】[NOIP... [NOIP2004 提高组] 合并果子 / [USACO06NOV] Fence Repair G ...
国民技术LPUART介绍 低功耗通用异步接收器(LPUART) 简介 低功耗通用异步收发器...
城乡供水一体化平台-助力乡村振... 城乡供水一体化管理系统建设方案 城乡供水一体化管理系统是运用云计算、大数据等信息化手段࿰...
程序的循环结构和random库...   第三个参数就是步长     引入文件时记得指明字符格式,否则读入不了 ...
中国版ChatGPT在哪些方面... 目录 一、中国巨大的市场需求 二、中国企业加速创新 三、中国的人工智能发展 四、企业愿景的推进 五、...
报名开启 | 共赴一场 Flu... 2023 年 1 月 25 日,Flutter Forward 大会在肯尼亚首都内罗毕...
汇编00-MASM 和 Vis... Qt源码解析 索引 汇编逆向--- MASM 和 Visual Studio入门 前提知识ÿ...
【简陋Web应用3】实现人脸比... 文章目录🍉 前情提要🌷 效果演示🥝 实现过程1. u...
前缀和与对数器与二分法 1. 前缀和 假设有一个数组,我们想大量频繁的去访问L到R这个区间的和,...
windows安装JDK步骤 一、 下载JDK安装包 下载地址:https://www.oracle.com/jav...
分治法实现合并排序(归并排序)... 🎊【数据结构与算法】专题正在持续更新中,各种数据结构的创建原理与运用✨...
在linux上安装配置node... 目录前言1,关于nodejs2,配置环境变量3,总结 前言...
Linux学习之端口、网络协议... 端口:设备与外界通讯交流的出口 网络协议:   网络协议是指计算机通信网...
Linux内核进程管理并发同步... 并发同步并发 是指在某一时间段内能够处理多个任务的能力,而 并行 是指同一时间能够处理...
opencv学习-HOG LO... 目录1. HOG(Histogram of Oriented Gradients,方向梯度直方图)1...
EEG微状态的功能意义 导读大脑的瞬时全局功能状态反映在其电场结构上。聚类分析方法一致地提取了四种头表面脑电场结构ÿ...
【Unity 手写PBR】Bu... 写在前面 前期积累: GAMES101作业7提高-实现微表面模型你需要了解的知识 【技...