跳至主要內容

Flink

fatSheep大约 10 分钟

Flink简介

基本概念

Apache Flink是一个开源的流处理框架,应用于分布式、高性能、高可用的数据流应用程序。可以处理有限数据流和无限数据,即能够处理有边界和无边界的数据流。无边界的数据流就是真正意义上的流数据,所以Flink是支持流计算的。有边界的数据流就是批数据,所以也支持批处理的。不过Flink在流处理上的应用比在批处理上的应用更加广泛,统一批处理和流处理也是Flink目标之一。Flink可以部署在各种集群环境,可以对各种大小规模的数据进行快速计算。

大数据(Big Data):指无法在一定时间内用常规软件工具对其进行获取、存储、管理和处理的数据集合. - 价值化:价值密度低、整体价值高 - 海量化:数据总量大 - 快速化:数据产生、处理的速度快 - 多样化:数据源、数据种类多(格式化、半格式化、结构化数据)

大数据计算框架发展历史

为什么需要流式计算

大数据的实时性带来价值更大

  • 监控场景:如果能实时发现业务系统的健康状态,就能提前避免业务故障:
  • 金融风控:如果实时监测出异常交易的行为,就能及时阻断风险的发生;
  • 实时推荐:比如在抖音,如果可以根据用户的行为数据发掘用户的兴趣、偏好,就能向用户推荐更感兴趣的内容;

大数据实时性的需求,带来了大数据计算架构模式的变化

  • 批示处理

流处理与批处理区别

  • 期望目标:
    • 低延迟
    • 高吞吐
    • 结果的准确性和良好的容错性
流式计算批式计算
个人理解他人发微信实时发送,实时接收 ,更真实地反映了我们的生活方式闲暇时,将之前接收到的微信消息一并查看、处理。
计算方式实时计算离线计算
处理时间延迟在秒级以内处理时间为分钟到小时级别,甚至天级别
处理延迟0~1s10s~1h+
应用场景广告推荐、金融风控搜索引擎索引构建、批式数据分析
数据流无限数据集有限数据集
时延低延迟、业务会感知运行中的情况实时性要求不高,只关注最终结果产出时间

Flink特征

  1. 一切皆为流 事件驱动应用(Event-driven Applications)
  2. 正确性保证
  • 唯一状态一致性(Exactly-once state consistency)
  • 事件-事件处理(Event-time processing)
  • 高超的最近数据处理(Sophisticated late data handling)
  1. 多层api
  • 基于流式和批量数据处理的SQL(SQL on Stream & Batch Data)
  • 流水数据API & 数据集API(DataStream API & DataSet API)
  • 处理函数 (时间 & 状态)(ProcessFunction (Time & State))

4.易用性

  • 部署灵活(Flexible deployment)
  • 高可用安装(High-availability setup)
  • 保存点(Savepoints)

5.可扩展性

  • 可扩展架构(Scale-out architecture)
  • 大量状态的支持(Support for very large state)
  • 增量检查点(Incremental checkpointing)

6.高性能

  • 低延迟(Low latency)
  • 高吞吐量(High throughput)
  • 内存计算(In-Memory computing)

数据处理架构

  • 数据模型
    • Spark采用RDD模型,spark streaming 的DStream实际上也就是一组组小批数据RDD的集合
    • Flink基本数据模型是数据流,以及事件(Event)序列
  • 运行时架构
    • spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
    • flink是标准的流执行模式,一个时间在一个节点处理完后可以直接发往下一个节点进行处理

流式计算引擎对比

StormSpark StreamingFlink
Streaming ModelNativemini-batchNative
一致性保证At Least/Most OnceExactly-OnceExactly-Once
延迟低延迟(毫秒级)延迟较高(秒级)低延迟(毫秒级)
吞吐LowHighHigh
容错ACKRDD Based CheckpointCheckpoint(Chandy-Lamport)
StateFulNoYes(DStream)Yes(Operator)
SQL支持NoYesYes

Flink整体架构

分层架构

核心组件

一个Flink集群,主要包含以下两个核心组件:

  1. JobManager(JM):负责整个任务的协调工作 包括:调度task、触发协调Task做Checkpoint. 协调容错恢复等;
  2. TaskManager(TM):负责执行一个DataFlow Graph的各个task以及data streams的buffer和 数据交换。

JobManager职责

  • Dispatcher:接收作业,拉起JobManager来执行作业,并在JobMaster挂掉之后恢复作业;
  • JobMaster:管理一个job的整个生命周期,会向ResourceManager申请slot,并将task调度到对应TM上;
  • ResourceManager:负责slot资源的管理和调度,Task manager拉起之后会向RM注册;

Flink作业示例

Flink流批一体

为什么需要流批一体

  • 流处理:在抖音中,实时统计一个短视频的播放量、点赞数 也包括抖音直播间的实时观看人数等;
  • 批处理:在抖音中,按天统计创造者的一些数据信息,比如 昨天的播放量有多少、评论量多少、广告收入多少; 工作流程: 上述架构痛点:
  • 人力成本比较高:批、流两套系统,相同逻辑需要开发两遍;
  • 数据链路冗余:本身计算内容是一致的,由于是两套链路,相同逻辑需要运行两遍,产生一定的资源浪费;
  • 数据口径不一致:两套系统、两套算子、两套UDF,通常会产生不同程度的误差,这些误差会给业务方带来非常大的困扰。

如何做到流批一体

  • 批式计算是流式计算的特例,Everything is Streams,有界数据集(批式数据)也是一种数据流、一种特殊的数据流
  • 从模块支持:
    • SQL层;
    • DataStream API层统一,批和流都可以使用DataStream API来开发.
    • Scheduler层架构统一,支持流批场景;
    • Failover Recovery层架构统一,支持流批场景
    • Shuffle Service层架构统一,流批场景选择不同的Shuffle Service;

流批一体的Scheduler层

1.12 之前的 Flink 版本,Flink 支持两种调度模式:

模式特点场景
EAGER申请一个作业所需要的全部资源,然后同时调度这个作业的全部 Task,所有的 Task 之间采取 Pipeline 的方式进行通信;Streaming 场景
LAZY先调度上游,等待上游产生数据或结束后再调度下游,类似 Spark 的 Stage 执行模式。Batch 场景

流批一体的Shuffle Service层

Shuffle:在分布式计算中,用来连接上下游数据交互的过程叫做Shuffle。实际上,分布式计算中所有涉及到上下游衔接的过程,都可以理解为 Shuffle;

针对不同的分布式计算框架,Shuffle通常有几种不同的实现:

  1. 基于文性的Pull Based Shuffle,比如Spark或MR,它的特点是具有较高的容错性,适合较大规模的批处理作业,由于是基于文件的,它的容错性和稳定性会更好一些;
  2. 基于Pipeline的Push Based Shuffle,比如Flink、Storm、.Presto等,它的特点是低延迟和高性能,但是因为shuffle数据没有存储下来,如果是batch任务的话,就需要进行重跑恢复;

流批Shuffle差异

流计算批计算
生命周期Shuffle数据与Task绑定Shuffle数据与Task是解耦的
数据存储介质生命周期短,实时性要求,通常存储在内存数据量大,有容错要求,存储在硬盘
部署方式服务和计算节点部署在一起,减少网络开销(减少latency)分开部署(本机可能宕机,重跑代价大)

流/批/OLAP业务场景概述

流式计算批式计算交互式分析
实时计算离线计算OLAP
延迟秒级以内处理时间分钟到小时,甚至天级别处理时间秒级
0~1s10s~1h+1~10s
广告推荐、金融风控搜索引擎构建索引、批式数据分析数据分析BI报表
三种业务场景的解决方案的要求及挑战是
模块流式计算批式计算
--------------------------------------------------------------------------------
SQLYesYes
实时性高,处理延迟毫秒级别
容错能力中,大作业失败重跑代价高
状态YesNo
准确性Exactly Once,要求高,失败重跑需要恢复之前的状态Exactly Once,失败重跑即可
扩展性YesYes

Flink做OLAP的优势

  • 引擎统一
    • 降低学习成本
    • 提高开发效率
    • 提高维护效率
  • 既有优势
    • 内存计算
    • Code-gen
    • Pipeline Shuffle
    • Session模式的MPP架构
  • 生态支持
    • 跨数据源查询支持
    • TCP-DS基准测试性能强

Flink快速上手

第一个项目

  1. 新建maven项目 点击左上角"文件",点击"新建"后选择"项目"
  2. 配置pom.xml
<properties>  
    <flink.version>1.13.0</flink.version>  
    <java.version>1.13.0</java.version>  
    <scala.binary.version>2.12</scala.binary.version>  
    <slf4j.version>1.7.30</slf4j.version>  
    <maven.compiler.source>8</maven.compiler.source>  
    <maven.compiler.target>8</maven.compiler.target>  
</properties>  
<dependencies>  
    <!--引入Flink相关依赖-->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-java</artifactId>  
        <version>${flink.version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-streaming-java_2.12</artifactId>  
        <version>${flink.version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-clients_${scala.binary.version}</artifactId>  
        <version>${flink.version}</version>  
    </dependency>  
    <!--引入日志管理相关依赖-->  
    <dependency>  
        <groupId>org.slf4j</groupId>  
        <artifactId>slf4j-api</artifactId>  
        <version>${slf4j.version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.slf4j</groupId>  
        <artifactId>slf4j-log4j12</artifactId>  
        <version>${slf4j.version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.logging.log4j</groupId>  
        <artifactId>log4j-to-slf4j</artifactId>  
        <version>2.14.0</version>  
    </dependency>  
</dependencies>
  1. 配置日志管理 在目录src/resource下添加文件log4j.properties,内容配置如下:
log4i.rootLogger=error,stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
Log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x -%m%n

批处理

DataSet API方式实现批处理:

import org.apache.flink.api.common.typeinfo.Types;  
import org.apache.flink.api.java.ExecutionEnvironment;  
import org.apache.flink.api.java.operators.AggregateOperator;  
import org.apache.flink.api.java.operators.DataSource;  
import org.apache.flink.api.java.operators.FlatMapOperator;  
import org.apache.flink.api.java.operators.UnsortedGrouping;  
import org.apache.flink.api.java.tuple.Tuple2;  
import org.apache.flink.util.Collector;  
  
public class BatchWordCount {  
    public static void main(String[] args) throws Exception {  
        // 1. 创建执行环境  
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();  
        // 2. 从文件读取数据  
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");  
        // 3。将每行数据进行分词,转换成二元组类型  
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {  
                    // 将一行文本进行分词  
                    String[] words = line.split(" ");  
                    // 将每个单词转换成二元组输出  
                    for (String word : words) {  
                        out.collect(Tuple2.of(word, 1L));  
                    }  
                })  
                .returns(Types.TUPLE(Types.STRING, Types.LONG));  
        // 4. 按照word进行分组  
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);  
        // 5. 分组内进行聚合统计  
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);  
        // 6. 打印结果  
        sum.print();  
    }  
}

有界流

DataStream方式实现WordCount:

import org.apache.flink.api.common.typeinfo.Types;  
import org.apache.flink.api.java.tuple.Tuple2;  
import org.apache.flink.streaming.api.datastream.DataStreamSource;  
import org.apache.flink.streaming.api.datastream.KeyedStream;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.util.Collector;  
  
import java.util.Collection;  
  
public class BoundedStreamWordCount {  
    public static void main(String[] args) throws Exception {  
        // 1.创建流式执行环境  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // 2. 读取文件  
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");  
  
        // 3. 转换计算  
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {  
                    String[] words = line.split(" ");  
                    for (String word : words) {  
                        out.collect(Tuple2.of(word, 1L));  
  
                    }  
                })  
                .returns(Types.TUPLE(Types.STRING, Types.LONG));  
  
        // 4. 分组  
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);  
  
        // 5. 求和  
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);  
  
        // 6. 打印输出  
        sum.print();  
  
        // 7. 启动执行  
        env.execute();  
    }  
}

输出格式如下:

  • 输出格式:执行并行子任务的编号(最小单位Task Slot) > (单词,出现次数)
  • 输出特点
    • 默认任务并行度为电脑CPU线程总数
    • 多线程并行输出,所以不是顺序执行
    • 相同字数由同一任务统计(子任务不知道别的任务统计到了多少次)

无界流

思路:数据源源不断,因此保持监听 操作步骤:

  1. 先执行 nc -lk 7777 执行socket服务