# tumbling window windowedCountsDF = \ eventsDF \ .withWatermark("eventTime", "10 minutes") \ .groupBy(“deviceId”, window("eventTime", "10 minutes") \ .count() # sliding window windowedCountsDF = \ eventsDF \ .withWatermark("eventTime", "10 minutes") \ .groupBy(“deviceId”, window("eventTime", "10 minutes", "5 minutes")) \ .count()

可以将 window 替换成 session_window 实现会话窗口:

# session window
windowedCountsDF = \
  eventsDF \
    .withWatermark("eventTime", "10 minutes") \
    .groupBy("deviceId", session_window("eventTime", "5 minutes")) \
    .count()

具有动态间隙持续时间的会话窗口

会话窗口除了具有相同的跨会话间隔持续时间之外,还有另一种类型的会话窗口,每个会话具有不同的间隔持续时间。 我们称之为“动态间隙持续时间(dynamic gap duration)”。


如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:过往记忆大数据

时间线下方的框表示每个事件及其间隔持续时间。一共有有四个事件,它们的(事件时间,间隔持续时间)对分别是(12:04,4 分钟)蓝色,(12:06,9 分钟)橙色,(12:09,5 分钟)黄色,以及( 12:15,5 分钟)绿色。

时间线上方的框表示由这些事件构成的实际会话。您可以将每个事件视为一个单独的会话,并将具有交集的会话合并为一个。正如您看到的,会话的时间范围是会话中包含的所有事件的时间范围的“联合”。 请注意,会话的结束时间不再是会话中最新事件的时间 + 间隙持续时间。

新的 session_window 函数接收两个参数,事件时间列和间隙持续时间。

对于动态会话窗口,您可以在“session_window”函数中为“间隙持续时间”参数提供一个“表达式”。这个表达式应该解析为一个间隔,比如“5分钟”。由于“间隙持续时间”参数接收一个表达式,我们也可以使用 UDF。

例如,基于 eventType 列的动态间隙持续时间的会话窗口计数可以实现如下:

# Define the session window having dynamic gap duration based on eventType
session_window expr = session_window(events.timestamp, \
    when(events.eventType == "type1", "5 seconds") \
    .when(events.eventType == "type2", "20 seconds") \
    .otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
windowedCountsDF = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(events.userID, session_window_expr) \
    .count()

原生支持会话窗口与 FlatMapGroupsWithState 比较

flatMapGroupsWithState 为实现会话窗口提供了更大的灵活性,但它需要用户编写一堆代码。例如,请参考 Apache Spark 上的 sessionization 示例,它通过 flatMapGroupsWithState 实现会话窗口。 请注意,Apache Spark 上的会话示例非常简化,仅适用于处理时间和附加模式。 通过原生支持会话窗口,处理事件时间和各种输出模式的总体复杂性被抽象了出来。

Spark 将原生支持会话窗口设置为覆盖一般用例的目标,因为它使 Spark 能够优化性能和状态存储。当我们的业务用例需要一个复杂的会话窗口时,我们可能仍然需要利用 flatMapGroupsWithState,例如,如果用例会话也应该在特定类型的事件上关闭,而不管是否处于不活动状态。

总结

我们已经介绍了流聚合中如何使用会话窗口,它也适用于批处理查询。通过学习如何使用新的 session_window 函数',我们可以利用流数据聚合与时间窗口的知识,并能够处理会话窗口。这也使 SQL/PySpark 用户能够处理会话窗口,因为 flatMapGroupsWithState API 在 PySpark 中不可用并且不能表示为 SQL 语句。

当然,时间窗口的操作还有很多空间可以改进,在此之前还是需要使用 flatMapGroupsWithState API,社区计划在不久的将来研究自定义窗口操作。

本文翻译自:Native Support of Session Window in Spark Structured Streaming

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark 3.2 内置支持会话窗口】(https://www.iteblog.com/archives/10063.html)
喜欢 (0)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!