通过Spark内置机制关闭

  其实Spark内置就为我们提供了一种优雅的方法来关闭长期运行的Streaming作业,我们来看看 StreamingContext 类中定义的一个 stop 方法:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean)

官方文档对其解释是:Stop the execution of the streams, with option of ensuring all received data has been processed. 控制所有接收的数据是否被处理的参数就是 stopGracefully,如果我们将它设置为true,Spark则会等待所有接收的数据被处理完成,然后再关闭计算引擎,这样就可以避免数据的丢失。现在的问题是我们在哪里调用这个stop方法?

Spark 1.4版本之前

在Spark 1.4版本之前,我们需要手动调用这个 stop 方法,一种比较合适的方式是通过 Runtime.getRuntime().addShutdownHook 来添加一个钩子,其会在JVM关闭的之前执行传递给他的函数,如下:

Runtime.getRuntime().addShutdownHook(new Thread() {
  override def run() {
    log("Gracefully stop Spark Streaming")
    streamingContext.stop(true, true)
  }
})

如果你使用的是Scala,我们还可以通过以下的方法实现类似的功能:

scala.sys.addShutdownHook({
  streamingContext.stop(true,true)
)})

通过上面的办法,我们客户确保程序退出之前会执行上面的函数,从而保证Streaming程序关闭的时候不丢失数据。

Spark 1.4版本之后

上面方式可以达到我们的需求,但是在每个程序里面都添加这样的重复代码也未免太过麻烦了!值得高兴的是,从Apache Spark 1.4版本开始,Spark内置提供了spark.streaming.stopGracefullyOnShutdown参数来决定是否需要以Gracefully方式来关闭Streaming程序(详情请参见SPARK-7776)。Spark会在启动 StreamingContext 的时候注册这个钩子,如下:

shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

private def stopOnShutdown(): Unit = {
    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
    logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
    // Do not stop SparkContext, let its own shutdown hook stop it
    stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

从上面的代码可以看出,我们可以根据自己的需求来设置 spark.streaming.stopGracefullyOnShutdown 的值,而不需要在每个Streaming程序里面手动调用StreamingContext的stop方法,确实方便多了。不过虽然这个参数在Spark 1.4开始引入,但是却是在Spark 1.6才开始才有文档正式介绍(可以参见https://github.com/apache/spark/pull/8898和http://spark.apache.org/docs/1.6.0/configuration.html)

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【如何优雅地终止正在运行的Spark Streaming程序】(https://www.iteblog.com/archives/1890.html)
喜欢 (11)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(1)个小伙伴在吐槽
  1. 我想问一下,我添加了这个配置,在程序执行的时候,执行kill -9 applicationID ,程序就直接停掉了呢,并没有执行完我的程序

    发呆的白水2020-12-02 16:04 回复