[源码分析] 从实例和源码入手看 Flink 之广播 Broadcast

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

[源码分析] 从实例和源码入手看 Flink 之广播 Broadcast

罗西的思考   2020-03-29 我要评论
# [源码分析] 从实例和源码入手看 Flink 之广播 Broadcast ## 0x00 摘要 本文将通过源码分析和实例讲解,带领大家熟悉Flink的广播变量机制。 ## 0x01 业务需求 ### 1. 场景需求 对黑名单中的IP进行检测过滤。IP黑名单的内容会随时增减,因此是可以随时动态配置的。 该黑名单假设存在mysql中,Flink作业启动时候会把这个黑名单从mysql载入,作为一个变量由Flink算子使用。 ### 2. 问题 我们不想重启作业以便重新获取这个变量。所以就需要一个能够动态修改算子里变量的方法。 ### 3. 解决方案 使用广播的方式去解决。去做配置的动态更新。 广播和普通的流数据不同的是:广播流的1条流数据能够被算子的所有分区所处理,而数据流的1条流数据只能够被算子的某一分区处理。因此广播流的特点也决定适合做配置的动态更新。 ## 0x02 概述 广播这部分有三个难点:使用步骤;如何自定义函数;如何存取状态。下面就先为大家概述下。 ### 1. broadcast的使用步骤 - 建立MapStateDescriptor - 通过DataStream.broadcast方法返回广播数据流BroadcastStream - 通过DataStream.connect方法,把业务数据流和BroadcastStream进行连接,返回BroadcastConnectedStream - 通过BroadcastConnectedStream.process方法分别进行processElement及processBroadcastElement处理 ### 2. 用户自定义处理函数 - BroadcastConnectedStream.process接收两种类型的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction - 两种类型的function都定义了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定义了一个onTimer方法,默认是空操作,允许子类重写 - processElement处理业务数据流 - processBroadcastElement处理广播数据流 ### 3. Broadcast State - Broadcast State始终表示为MapState,即map format。这是Flink提供的最通用的状态原语。是托管状态的一种,托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。 - 用户必须创建一个 `MapStateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称, 状态所持有值的类型,并且可能包含用户指定的函数 - checkpoint的时候也会checkpoint broadcast state - Broadcast State只在内存有,没有RocksDB state backend - Flink 会将state广播到每个task,注意该state并不会跨task传播,对其修改仅仅是作用在其所在的task - downstream tasks接收到broadcast event的顺序可能不一样,所以依赖其到达顺序来处理element的时候要小心 ## 0x03. 示例代码 ### 1. 示例代码 我们直接从Flink源码入手可以找到理想的示例。 以下代码直接摘录 Flink 源码 StatefulJobWBroadcastStateMigrationITCase,我会在里面加上注释说明。 ```scala @Test def testRestoreSavepointWithBroadcast(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 以下两个变量是为了确定广播流发出的数据类型,广播流可以同时发出多种类型的数据 lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]) lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) env.setStateBackend(new MemoryStateBackend) env.enableCheckpointing(500) env.setParallelism(4) env.setMaxParallelism(4) // 数据流,这里数据流和广播流的Source都是同一种CheckpointedSource。数据流这里做了一系列算子操作,比如flatMap val stream = env .addSource( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") .keyBy( new KeySelector[(Long, Long), Long] { override def getKey(value: (Long, Long)): Long = value._1 } ) .flatMap(new StatefulFlatMapper) .keyBy( new KeySelector[(Long, Long), Long] { override def getKey(value: (Long, Long)): Long = value._1 } ) // 广播流 val broadcastStream = env .addSource( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource") .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc) // 把数据流和广播流结合起来 stream .connect(broadcastStream) .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState)) .addSink(new AccumulatorCountingSink) } } // 用户自定义的处理函数 class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)] { // 重点说明,这里的 firstBroadcastStateDesc,secondBroadcastStateDesc 其实和之前广播流的那两个MapStateDescriptor无关。 // 这里两个MapStateDescriptor是为了存取BroadcastState,这样在 processBroadcastElement和processElement之间就可以传递变量了。我们完全可以定义新的MapStateDescriptor,只要processBroadcastElement和processElement之间认可就行。 // 这里参数 "broadcast-state-1" 是name, flink就是用这个 name 来从Flink运行时系统中存取MapStateDescriptor lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]) val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) override def processElement( value: (Long, Long), ctx: KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext, out: Collector[(Long, Long)]): Unit = { // 这里Flink源码中是直接把接受到的业务变量直接再次转发出去 out.collect(value) } override def processBroadcastElement( value: (Long, Long), ctx: KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)]#Context, out: Collector[(Long, Long)]): Unit = { // 这里是把最新传来的广播变量存储起来,processElement中可以取出再次使用. 具体是通过firstBroadcastStateDesc 的 name 来获取 BroadcastState ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2) ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString) } } // 广播流和数据流的Source private class CheckpointedSource(val numElements: Int) extends SourceFunction[(Long, Long)] with CheckpointedFunction { private var isRunning = true private var state: ListState[CustomCaseClass] = _ // 就是简单的定期发送 override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) { ctx.emitWatermark(new Watermark(0)) ctx.getCheckpointLock synchronized { var i = 0 while (i < numElements) { ctx.collect(i, i) i += 1 } } // don't emit a final watermark so that we don't trigger the registered event-time // timers while (isRunning) Thread.sleep(20) } } ``` ### 2. 技术难点 #### MapStateDescriptor 首先要说明一些概念: - Flink中包含两种基础的状态:Keyed State和Operator State。 - Keyed State和Operator State又可以 以两种形式存在:原始状态和托管状态。 - 托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。 - raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。 - MapState是托管状态的一种:即状态值为一个map。用户通过`put`或`putAll`方法添加元素。 回到我们的例子,广播变量就是OperatorState的一部分,是以托管状态的MapState形式保存的。具体getBroadcastState函数就是DefaultOperatorStateBackend中的实现 所以我们需要用MapStateDescriptor描述broadcast state,这里MapStateDescriptor的使用比较灵活,因为是key,value类似使用,所以个人觉得value直接使用类,这样更方便,尤其是对于从其他语言转到scala的同学。 #### processBroadcastElement ```scala // 因为主要起到控制作用,所以这个函数的处理相对简单 override def processBroadcastElement(): Unit = { // 这里可以把最新传来的广播变量存储起来,processElement中可以取出再次使用,比如 ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2) } ``` #### processElement ```java // 这个函数需要和processBroadcastElement配合起来使用 override def processElement(): Unit = { // 可以取出processBroadcastElement之前存储的广播变量,然后用此来处理业务变量,比如 val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) var actualSecondState = Map[String, String]() for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) { val v = secondExpectedBroadcastState.get(entry.getKey).get actualSecondState += (entry.getKey -> entry.getValue) } // 甚至这里只要和processBroadcastElement一起关联好,可以存储任意类型的变量。不必须要和广播变量的类型一致。重点是声明新的对应的MapStateDescriptor // MapStateDescriptor继承了StateDescriptor,其中state为MapState类型,value为Map类型 } ``` #### 结合起来使用 因为某些限制,所以下面只能从网上找一个例子给大家讲讲。 ```java // 模式始终存储在MapState中,并将null作为键。broadcast state始终表示为MapState,这是Flink提供的最通用的状态原语。 MapStateDescriptor

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们