/** * User: 过往记忆 * Date: 2015-05-19 * Time: 下午23:50 * bolg: * 本文地址:/archives/1362 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ DeprecatedConfig("spark.cleaner.ttl", "1.4", "TTL-based metadata cleaning is no longer necessary in recent Spark versions " + "and can lead to confusing errors if metadata is deleted for entities that are still" + " in use. Except in extremely special circumstances, you should remove this setting" + " and rely on Spark's reference-tracking-based cleanup instead." + " See SPARK-7689 for more details.")
同时社区重新设计了删除RDD的逻辑,使得Spark可以自动地清除已经持久化RDD相关的metadata和数据,以及shuffles和broadcast 相关变量数据,并引入了ContextCleaner
类,这个类在SparkContext
中被实例化:
/** * User: 过往记忆 * Date: 2015-05-19 * Time: 下午23:50 * bolg: * 本文地址:/archives/1362 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } } cleaner.foreach(_.start())
在ContextCleaner
中会调用RDD.unpersist()
来清除已经持久化的RDD数据:
/** Perform RDD cleanup. */ def doCleanupRDD(rddId: Int, blocking: Boolean) { try { logDebug("Cleaning RDD " + rddId) sc.unpersistRDD(rddId, blocking) listeners.foreach(_.rddCleaned(rddId)) logInfo("Cleaned RDD " + rddId) } catch { case t: Throwable => logError("Error cleaning RDD " + rddId, t) } }
清除Shuffle
和Broadcast
相关的数据会分别调用doCleanupShuffle
和doCleanupBroadcast
函数。根据需要清除数据的类型分别调用:
task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) }
相信加上这些逻辑之后,Spark清除RDD会更加智能,期待吧。
本博客文章除特别声明,全部都是原创!