上面是一张MapReduce读取一个文本数据的逻辑顺序处理图。我们知道,不管是本地运行还是集群模式下,最终以job的任务调度形式运行,主要分为两个阶段
比如在wordcount案例中,一段文本数据,在map阶段首先被解析,拆分成一个个的单词,其实对hadoop来说,这项工作的完成,是由背后开启的一个MapTask进行处理的,等job处理完成,看到在目标文件夹下,生成了对应的单词统计结果
如果有多个单词统计文本文件要处理呢?我们不妨改造下wordcount的job代码,在一个目录下放多个处理文件,看运行完毕的结果如何呢?
public static void main(String[] args) throws Exception { //1、获取job Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //2、设置jar路径 job.setJarByClass(DemoJobDriver.class); //3、关联mapper 和 Reducer job.setMapperClass(DemoMapper.class); job.setReducerClass(DemoReducer.class); //4、设置 map输出的 key/val 的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终输出的key / val 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置最终的输出路径 String inputPath = "F:\\网盘\\csv\\combines\\"; String outPath = "F:\\网盘\\csv\\result"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); // 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }
运行完毕后,我们随机截取几张运行中的日志,通过阅读关键信息,相信感兴趣的同学能看出点什么吧,总结下来
使用hadoop或者其他大数据框架的一个很重要的原因在于,它们的底层设计能很好的支撑任务的并行化处理,也就是说,会充分利用服务器的配置将一个复杂的任务,或者单个Task处理起来很耗时的任务根据需要拆分成多个并行的子任务来处理,充分利用服务器性能,达到任务处理的最优,耗时最短,机器性能利用率最佳
hadoop同样如此,提供了很多配置参数提供客户端选择,从而提升任务的处理性能
我们知道,hadoop的任务处理主要分为2个阶段,Map和Reduce阶段,默认情况下,结合上面的案例可以知道,将会根据文件个数默认开启等量的MapTask去处理,但是我们设想这样一个问题,现在的文件比较小,尚未超过默认的一个blocksize ,即128M,如果超出了怎么办?甚至说这个文件达到1个G怎么办?
于是得出如下结论:
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度
这样说来,当要处理的某个文件特别大的时候,通过设置MapTask的并行度是可以提升整个Map阶段的处理速度的
思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
设想一段300MB大小的文件,假如按照100MB为一个切片的话,可以分为3个切片,这样Map阶段将会开启3个MapTask来处理这个任务,但是默认情况下,Hadoop处理的一个文件块即block的size大小为128Mb,那么问题来了,假如在生产环境下,hadoop真正在分布式环境下运行,任务往往分布在不同的机器上运行的,如下图所示
以上按照直观的理解,可以归纳出上面几点,但仔细分析下,会发现另一个问题就是,切片规则是客户端人为指定的规则,可以理解为一个账本,上面记录了工人干活时的工时,从而为结工资的时候做考量
但是对于3个节点来说,它们可不这么想了,因为它们是真正干活即执行任务的,人家管你是什么切片规则呢?总不能按照你的100MB大小的切片规则将自己的默认的128MB的数据库大小也改为100MB吧?这显然是不可能的,那该怎么办呢?
既然你切片上的规则是100MB嘛,于是node1节点就按照你的规则来,我这个节点上就处理100MB大小的数据就完事了,还剩下28MB大小的数据怎么办?既然分成了3个切片,肯定要开启3个MapTask了,node2节点也要处理一个任务了,但是不能随意就处理数据啊,得先把node1节点上面那个28MB的未处理完毕的文件拷贝过来,再拼接出72MB大小的数据块,凑够100MB了再搞事
于是,如果在真正的分布式环境下,这样就存在一个数据文件的跨节点拷贝问题,这很显然会带来一部分的网络开销,如果数据文件较大话,这个性能损耗就很值得考虑了
按照以上理解,我们可总结出如下经验:
默认情况下,不做任何设置的话,hadoop将采用FileInputFormat切片机制,简单来说,原理如下:
这个相对来说,比较简单,就不再过多赘述了,可以通过源码调试,找到下面的writeNewSplits 方法,进去看看源码的做法
FileInputFormat实现类
在编写job的main程序中,还记得最后设置读取文件和输出文件的两行代码
在运行MapReduce程序时,输入的文件格式有很多种,比如:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
最常见的就是TextInputFormat
以下是一个示例,比如,一个分片包含了如下4条文本记录
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
每条记录可以表示为以下键/值对:
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)
CombineTextInputFormat切片机制
从上面的分析来看,框架默认的TextInputFormat切片机制,是对任务按文件规划切片,不管文件多小,都会作为一个单独的切片,交给一个MapTask,假如有大量小文件,就会产生大量的MapTask,从而处理效率上并不高
于是就可以考虑另一种切片机制,即CombineTextInputFormat
CombineTextInputFormat应用场景
CombineTextInputFormat 用于小文件过多的场景,它可将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理
比如上面几个文件,最大的只有不到7MB,最下的只有不到2MB,那么基于CombineTextInputFormat的切片机制,可以考虑使用这种切片来做,具体的设置在job任务的代码中按照下面这样做设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m 或者其他数据
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值,这个可以按照总体文件的大小,获取一个中位数比较好
CombineTextInputFormat案例代码演示
使用上面的4个文件作为输入数据源,期望只需要使用一个切片处理4个文件(默认情况下,4个文件将会启动4个切片和4个MapTask,从控制台日志中观察)
使用CombineTextInputFormat的切片,大概如下面的实现过程
// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置20m CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
通过控制台的输出结果,验证了上面的目标猜想