[源码分析] 从FlatMap用法到Flink的内部实现

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

[源码分析] 从FlatMap用法到Flink的内部实现

罗西的思考   2020-03-30 我要评论
# [源码分析] 从FlatMap用法到Flink的内部实现 ## 0x00 摘要 本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap。希望能让大家对这个概念有更深入的理解。 ## 0x01 Map vs FlatMap 首先我们先从概念入手。 自从*响应式编程*慢慢壮大以来,这两个单词现在越来越被大家熟悉了。前端能见到它们的身影,后台也能见到;安卓里面有,iOS也有。很多兄弟刚遇到它们时候是懵圈的,搞不清楚之间的区别。下面我就给大家简单讲解下。 ### map 它把`数组流`中的每一个值,使用所提供的函数执行一遍,一一对应。得到与元素个数相同的`数组流`。然后返回这个新数据流。 ### flatMap flat是扁平的意思。所以这个操作是:先映射(map),再拍扁(join)。 flatMap输入可能是多个`子数组流`。所以flatMap先针对 每个`子数组流`的每个元素进行映射操作。然后进行扁平化处理,最后汇集所有进行扁平化处理的结果集形成一个新的列表(扁平化简而言之就是去除所有的修饰)。 flatMap与map另外一个不一样的地方就是传入的函数在处理完后返回值必须是List。 ### 实例 比如拿到一个文本文件之后,我们是按行读取,按行处理。如果要对每一行的单词数进行计数,那么应该选择Map方法,如果是统计词频,就应该选择flatMap方法。 如果还不清楚,可以看看下面这个例子: ```scala 梁山新进一批好马,准备给每个马军头领配置一批。于是得到函数以及头领名单如下: 函数 = ( 头领 => 头领 + 好马 ) 五虎将 = List(关胜、林冲、秦明、呼延灼、董平 ) 八骠骑 = List(花荣、徐宁、杨志、索超、张清、朱仝、史进、穆弘 ) // Map函数的例子 利用map函数,我们可以得到 五虎将马军 五虎将马军 = 五虎将.map( 头领 => 头领 + 好马 ) 结果是 List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马 ) // flatMap函数的例子 但是为了得到统一的马军,则可以用flatMap: 马军头领 = List(五虎将,八骠骑) 马军 = 马军头领.flatMap( 头领 => 头领 + 好马 ) 结果就是:List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马,花荣 + 马、徐宁 + 马、杨志 + 马、索超 + 马、张清 + 马、朱仝 + 马、史进 + 马、穆弘 + 马 ) ``` 现在大家应该清楚了吧。接下来看看几个FlatMap的实例。 ### Scala语言的实现 Scala本身对于List类型就有map和flatMap操作。举例如下: ```scala val names = List("Alice","James","Apple") val strings = names.map(x => x.toUpperCase) println(strings) // 输出 List(ALICE, JAMES, APPLE) val chars = names.flatMap(x=> x.toUpperCase()) println(chars) // 输出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E) ``` ### Flink的例子 以上是scala语言层面的实现。下面我们看看Flink框架是如何使用FlatMap的。 网上常见的一个Flink应用的例子: ```scala //加载数据源 val source = env.fromElements("china is the best country","beijing is the capital of china") //转化处理数据 val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) ``` ### Flink源码中的例子 ```scala case class WordWithCount(word: String, count: Long) val text = env.socketTextStream(host, port, '\n') val windowCounts = text.flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") windowCounts.print() ``` ## 0x02 自定义算子(in Flink) 上面提到的都是简单的使用,如果有复杂需求,在Flink中,我们可以通过继承FlatMapFunction和RichFlatMapFunction来自定义算子。 ### 函数类`FlatMapFunction` 对于不涉及到状态的使用,可以直接继承 FlatMapFunction,其定义如下: ```java @Public @FunctionalInterface public interface FlatMapFunction

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们