详细统计信息
统计信息页面显示了包括输入/处理速率、延迟和详细操作持续时间在内的指标,这对于洞察流查询的状态非常有用,使我们能够轻松地调试流作业运行过程中的异常情况。页面如下所示:
上图包含以下的监控信息:
在这一小节,让我们来看看如何使用 Structured Streaming 新的 UI 来进行异常排除。我们的测试代码如下:
import java.util.UUID val bootstrapServers = ... val topics = ... val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .option("checkpointLocation", checkpointLocation) .start()
在第一种情况下,我们运行查询来尽快处理 Apache Kafka 中读取的数据。在每批中,流作业将处理 Kafka 中所有可用的数据。如果我们的资源不足以快速处理当前批次的数据,那么延迟将迅速增加。最直观的判断是输入行和批处理持续时间会呈线性增长。处理速率(Process Rate)提示流作业最多只能处理大约8,000条记录/秒。但是当前的输入速率大约是每秒 20,000 条记录。我们可以为流作业提供更多的执行资源,或者添加足够的分区来处理这些数据。
这种情况相比第一种情况是处理延迟没有持续增加,具体如下所示:
我们发现在相同的输入速率(Input Rate)下,处理速率(Process Rate)可以保持稳定。这意味着作业的处理能力足以处理输入数据。然而,每批处理的进程持续时间(即延迟)仍然高达20秒。高延迟的主要原因是每个批处理中有太多数据需要处理。通常我们可以通过增加作业的并行性来减少延迟。在为 Spark 任务添加了10个Kafka分区和10个核心之后,我们发现延迟大约为5秒——比20秒要好得多。
操作持续时间(Operation Duration)图以毫秒为单位显示执行各种操作所花费的时间。这对于了解每个批次的时间分布并简化故障排除很有用。 让我们以Apache Spark 社区中的性能改进 SPARK-30915 为例进行说明。
在 SPARK-30915 工作之前,当压缩后的元数据日志变得很大时,压缩后的下一批处理要比其他批处理花费更多的时间。
经过对代码进行分析之后,发现并修复了不必要的读取压缩日志文件的问题,也就是 SPARK-30915 解决的,下图的运行时间确认了我们预期的效果:
通过上面三个案例,新的 Structured Streaming UI 将帮助开发人员通过更加有用的流查询信息来更好地监视流作业。作为早期发布版本,新的 UI 仍在开发中,并将在以后的版本中进行改进,包括但不限于以下功能:
本文翻译自:A look at the new Structured Streaming UI in Apache Spark™ 3.0
本博客文章除特别声明,全部都是原创!