Apache Flink: Juggling with Bits and Bytes):

1. BasicTypeInfo: 任意Java基本类型(装包或未装包)和String类型。

2. BasicArrayTypeInfo: 任意Java基本类型数组(装包或未装包)和String数组。

3. WritableTypeInfo: 任意Hadoop’s Writable接口的实现类.

4. TupleTypeInfo: 任意的Flink tuple类型(支持Tuple1 to Tuple25). Flink tuples是固定长度固定类型的Java Tuple实现。

5. CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples).

6. PojoTypeInfo: 任意的POJO (Java or Scala),例如,Java对象的所有成员变量,要么是public修饰符定义,要么有getter/setter方法。

7. GenericTypeInfo: 任意无法匹配之前几种类型的类。)

前6种类型数据集几乎覆盖了绝大部分的Flink程序,针对前6种类型数据集,Flink皆可以自动生成对应的TypeSerializer定制序列化工具,非常有效率的对数据集进行序列化和反序列化。对于第7中类型,Flink使用Kryo进行序列化和反序列化。此外,对于可被用作Key的类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据直接进行compare,hash等之类的操作。对于Tuple,CaseClass,Pojo等组合类型,Flink自动生成的TypeSerializer,TypeComparator同样是组合的,并把其成员的序列化/反序列化代理给其成员对应的TypeSerializer,TypeComparator,如下图所示:

图2 Flink组合类型序列化

此外,如有需要,用户可通过集成TypeInformation接口,定制实现自己的序列化工具。

显式的内存管理

垃圾回收的JVM内存管理回避不了的问题,JDK8的G1算法改善了JVM垃圾回收的效率和可用范围,但对于大数据处理的实际环境中,还是远远不够。这也和现在分布式框架的发展趋势有冲突,越来越多的分布式计算框架希望尽可能多的将待处理的数据集放在内存中,而对于JVM垃圾回收来说,内存中Java对象越少,存活时间越短,其效率越高。通过JVM进行内存管理的话,OutOfMemoryError也是一个很难解决的问题。同时,在JVM内存管理中,Java对象有潜在的碎片化存储问题(Java对象所有信息可能不是在内存中连续存储),也有可能在所有Java对象大小没有超过JVM分配内存时,出现OutOfMemoryError问题。

Flink的内存管理

Flink将内存分为三个部分,每个部分都有不同的用途:

1. Network buffers: 一些以32KB Byte数组为单位的buffer,主要被网络模块用于数据的网络传输。

2. Memory Manager pool: 大量以32KB Byte数组为单位的内存池,所有的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存,并将序列化后的数据存储其中,结束后释放回内存池。

3. Remaining (Free) Heap: 主要留给UDF中用户自己创建的Java对象,由JVM管理。

Network buffers在Flink中主要基于Netty的网络传输,无需多讲。Remaining Heap用于UDF中用户自己创建的Java对象,在UDF中,用户通常是流式的处理数据,并不需要很多内存,同时Flink也不鼓励用户在UDF中缓存很多数据,因为这会引起前面提到的诸多问题。Memory Manager pool(以后以内存池代指)通常会配置为最大的一块内存,接下来会详细介绍。

在Flink中,内存池由多个MemorySegment组成,每个MemorySegment代表一块连续的内存,底层存储是byte[],默认32KB大小。MemorySegment提供了根据偏移量访问数据的各种方法,如get/put int,long,float,double等,MemorySegment之间数据拷贝等方法,和java.nio.ByteBuffer类似。对于Flink的数据结构,通常包括多个向内存池申请的MemeorySegment,所有要存入的对象,通过TypeSerializer序列化之后,将二进制数据存储在MemorySegment中,在取出时,通过TypeSerializer反序列化。数据结构通过MemorySegment提供的set/get方法访问具体的二进制数据。

Flink这种看起来比较复杂的内存管理方式带来的好处主要有:

1. 二进制的数据存储大大提高了数据存储密度,节省了存储空间。

2. 所有的运行时数据结构和算法只能通过内存池申请内存,保证了其使用的内存大小是固定的,不会因为运行时数据结构和算法而发生OOM。而对于大部分的分布式计算框架来说,这部分由于要缓存大量数据,是最有可能导致OOM的地方。

3. 内存池虽然占据了大部分内存,但其中的MemorySegment容量较大(默认32KB),所以内存池中的Java对象其实很少,而且一直被内存池引用,所有在垃圾回收时很快进入持久代,大大减轻了JVM垃圾回收的压力。

4. Remaining Heap的内存虽然由JVM管理,但是由于其主要用来存储用户处理的流式数据,生命周期非常短,速度很快的Minor GC就会全部回收掉,一般不会触发Full GC。

Flink当前的内存管理在最底层是基于byte[],所以数据最终还是on-heap,最近Flink增加了off-heap的内存管理支持,将会在下一个release中正式出现。Flink off-heap的内存管理相对于on-heap的优点主要在于(更多细节,请参考Apache Flink: Off-heap Memory in Apache Flink and the curious JIT compiler):

1. 启动分配了大内存(例如100G)的JVM很耗费时间,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都会很小,垃圾回收也不用考虑MemorySegment中的Java对象了。

2. 更有效率的IO操作。在off-heap下,将MemorySegment写到磁盘或是网络,可以支持zeor-copy技术,而on-heap的话,则至少需要一次内存拷贝。

3. off-heap可用于错误恢复,比如JVM崩溃,在on-heap时,数据也随之丢失,但在off-heap下,off-heap的数据可能还在。此外,off-heap上的数据还可以和其他程序共享。

Spark的内存管理

Spark的off-heap内存管理与Flink off-heap模式比较相似,也是通过Java UnSafe API直接访问off-heap内存,通过定制的序列化工具将序列化后的二进制数据存储与off-heap上,Spark的数据结构和算法直接访问和操作在off-heap上的二进制数据。Project Tungsten是一个正在进行中的项目,想了解具体进展可以访问:[SPARK-7075] Project Tungsten (Spark 1.5 Phase 1)[SPARK-9697] Project Tungsten (Spark 1.6)

缓存友好的计算

磁盘IO和网络IO之前一直被认为是Hadoop系统的瓶颈,但是随着Spark,Flink等新一代的分布式计算框架的发展,越来越多的趋势使得CPU/Memory逐渐成为瓶颈,这些趋势包括:

1. 更先进的IO硬件逐渐普及。10GB网络和SSD硬盘等已经被越来越多的数据中心使用。

2. 更高效的存储格式。Parquet,ORC等列式存储被越来越多的Hadoop项目支持,其非常高效的压缩性能大大减少了落地存储的数据量。

3. 更高效的执行计划。例如Spark DataFrame的执行计划优化器的Fliter-Push-Down优化会将过滤条件尽可能的提前,甚至提前到Parquet的数据访问层,使得在很多实际的工作负载中,并不需要很多的磁盘IO。

由于CPU处理速度和内存访问速度的差距,提升CPU的处理效率的关键在于最大化的利用L1/L2/L3/Memory,减少任何不必要的Cache miss。定制的序列化工具给Spark和Flink提供了可能,通过定制的序列化工具,Spark和Flink访问的二进制数据本身,因为占用内存较小,存储密度比较大,而且还可以在设计数据结构和算法时,尽量连续存储,减少内存碎片化对Cache命中率的影响,甚至更进一步,Spark与Flink可以将需要操作的部分数据(如排序时的Key)连续存储,而将其他部分的数据存储在其他地方,从而最大可能的提升Cache命中的概率。

Flink中的数据结构

以Flink中的排序为例,排序通常是分布式计算框架中一个非常重的操作,Flink通过特殊设计的排序算法,获得了非常好了性能,其排序算法的实现如下:

1. 将待排序的数据经过序列化后存储在两个不同的MemorySegment集中。数据全部的序列化值存放于其中一个MemorySegment集中。数据序列化后的Key和指向第一个MemorySegment集中其值的指针存放于第二个MemorySegment集中。

2. 对第二个MemorySegment集中的Key进行排序,如需交换Key位置,只需交换对应的Key+Pointer的位置,第一个MemorySegment集中的数据无需改变。 当比较两个Key大小时,TypeComparator提供了直接基于二进制数据的对比方法,无需反序列化任何数据。

3. 排序完成后,访问数据时,按照第二个MemorySegment集中Key的顺序访问,并通过Pinter值找到数据在第一个MemorySegment集中的位置,通过TypeSerializer反序列化成Java对象返回。

图3 Flink排序算法

这样实现的好处有:

1. 通过Key和Full data分离存储的方式,尽量将被操作的数据最小化,提高Cache命中的概率,从而提高CPU的吞吐量。

2. 移动数据时,只需移动Key+Pointer,而无须移动数据本身,大大减少了内存拷贝的数据量。

3. TypeComparator直接基于二进制数据进行操作,节省了反序列化的时间。

Spark的数据结构

Spark中基于off-heap的排序与Flink几乎一模一样,在这里就不多做介绍了,感兴趣的话,请参考:databricks.com/blog/201

总结

本文主要介绍了Hadoop生态圈的一些项目遇到的一些因为JVM内存管理导致的问题,以及社区是如何应对的。基本上,以内存为中心的分布式计算框架,大都开始了部分脱离JVM,走上了自己管理内存的路线,Project Tungsten甚至更进一步,提出了通过LLVM,将部分逻辑编译成本地代码,从而更加深入的挖掘SIMD等CPU潜力。此外,除了Spark,Flink这样的分布式计算框架,HBase(HBASE-11425),HDFS(HDFS-7844)等项目也在部分性能相关的模块通过自己管理内存来规避JVM的一些缺陷,同时提升性能。

参考

  1. project tungsten:project-tungsten-bringing-spark-closer-to-bare-metal, http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen

  2. The "Memory Wall": Modern Microprocessors

  3. flink memory management: Apache Flink: Juggling with Bits and Bytes

  4. java GC:Tuning Java Garbage Collection for Spark Applications

  5. Project Valhalla: OpenJDK: Valhalla

  6. java object size: dweiss/java-sizeof · GitHub

  7. Big Data Performance Engineering

  本文转载自:http://zhuanlan.zhihu.com/hadoop/20228397
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【脱离JVM? Hadoop生态圈的挣扎与演化】(https://www.iteblog.com/archives/1521.html)
喜欢 (9)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!