# tumbling window windowedCountsDF = \ eventsDF \ .withWatermark("eventTime", "10 minutes") \ .groupBy(“deviceId”, window("eventTime", "10 minutes") \ .count() # sliding window windowedCountsDF = \ eventsDF \ .withWatermark("eventTime", "10 minutes") \ .groupBy(“deviceId”, window("eventTime", "10 minutes", "5 minutes")) \ .count()
可以将 window 替换成 session_window 实现会话窗口:
# session window windowedCountsDF = \ eventsDF \ .withWatermark("eventTime", "10 minutes") \ .groupBy("deviceId", session_window("eventTime", "5 minutes")) \ .count()
会话窗口除了具有相同的跨会话间隔持续时间之外,还有另一种类型的会话窗口,每个会话具有不同的间隔持续时间。 我们称之为“动态间隙持续时间(dynamic gap duration)”。
时间线下方的框表示每个事件及其间隔持续时间。一共有有四个事件,它们的(事件时间,间隔持续时间)对分别是(12:04,4 分钟)蓝色,(12:06,9 分钟)橙色,(12:09,5 分钟)黄色,以及( 12:15,5 分钟)绿色。
时间线上方的框表示由这些事件构成的实际会话。您可以将每个事件视为一个单独的会话,并将具有交集的会话合并为一个。正如您看到的,会话的时间范围是会话中包含的所有事件的时间范围的“联合”。 请注意,会话的结束时间不再是会话中最新事件的时间 + 间隙持续时间。
新的 session_window 函数接收两个参数,事件时间列和间隙持续时间。
对于动态会话窗口,您可以在“session_window”函数中为“间隙持续时间”参数提供一个“表达式”。这个表达式应该解析为一个间隔,比如“5分钟”。由于“间隙持续时间”参数接收一个表达式,我们也可以使用 UDF。
例如,基于 eventType 列的动态间隙持续时间的会话窗口计数可以实现如下:
# Define the session window having dynamic gap duration based on eventType session_window expr = session_window(events.timestamp, \ when(events.eventType == "type1", "5 seconds") \ .when(events.eventType == "type2", "20 seconds") \ .otherwise("5 minutes")) # Group the data by session window and userId, and compute the count of each group windowedCountsDF = events \ .withWatermark("timestamp", "10 minutes") \ .groupBy(events.userID, session_window_expr) \ .count()
flatMapGroupsWithState 为实现会话窗口提供了更大的灵活性,但它需要用户编写一堆代码。例如,请参考 Apache Spark 上的 sessionization 示例,它通过 flatMapGroupsWithState 实现会话窗口。 请注意,Apache Spark 上的会话示例非常简化,仅适用于处理时间和附加模式。 通过原生支持会话窗口,处理事件时间和各种输出模式的总体复杂性被抽象了出来。
Spark 将原生支持会话窗口设置为覆盖一般用例的目标,因为它使 Spark 能够优化性能和状态存储。当我们的业务用例需要一个复杂的会话窗口时,我们可能仍然需要利用 flatMapGroupsWithState,例如,如果用例会话也应该在特定类型的事件上关闭,而不管是否处于不活动状态。
我们已经介绍了流聚合中如何使用会话窗口,它也适用于批处理查询。通过学习如何使用新的 session_window 函数',我们可以利用流数据聚合与时间窗口的知识,并能够处理会话窗口。这也使 SQL/PySpark 用户能够处理会话窗口,因为 flatMapGroupsWithState API 在 PySpark 中不可用并且不能表示为 SQL 语句。
当然,时间窗口的操作还有很多空间可以改进,在此之前还是需要使用 flatMapGroupsWithState API,社区计划在不久的将来研究自定义窗口操作。
本文翻译自:Native Support of Session Window in Spark Structured Streaming
本博客文章除特别声明,全部都是原创!