如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

  这种不一致产生的原因是因为两个系统无法对那些已经接收到的数据信息保存进行原子操作。为了解决这个问题,只需要一个系统来维护那些已经发送或接收的一致性视图,而且,这个系统需要拥有从失败中恢复的一切控制权利。基于这些考虑,社区决定将所有的消费偏移量信息只存储在Spark Streaming中,并且使用Kafka的低层次消费者API来从任意位置恢复数据。

  为了构建这个系统,新引入的Direct API采用完全不同于Receivers和WALs的处理方式。它不是启动一个Receivers来连续不断地从Kafka中接收数据并写入到WAL中,而且简单地给出每个batch区间需要读取的偏移量位置,最后,每个batch的Job被运行,那些对应偏移量的数据在Kafka中已经准备好了。这些偏移量信息也被可靠地存储(checkpoint),在从失败中恢复可以直接读取这些偏移量信息。

  需要注意的是,Spark Streaming可以在失败以后重新从Kafka中读取并处理那些数据段。然而,由于仅处理一次的语义,最后重新处理的结果和没有失败处理的结果是一致的。

  因此,Direct API消除了需要使用WAL和Receivers的情况,而且确保每个Kafka记录仅被接收一次并被高效地接收。这就使得我们可以将Spark Streaming和Kafka很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性,高效性,而且很容易地被使用。

如何来使用

  新的API相比之前的更加容易使用:

// Define the Kafka parameters, broker list must be specified
val kafkaParams = Map("metadata.broker.list" -> "www.iteblog.com:9092,anotherhost:9092")

// Define which topics to read from
val topics = Set("sometopic", "iteblog")

// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
  StringDecoder](streamingContext, kafkaParams, topics)

  因为新的API没有任何receivers,所以你不需要担心如何创建多个DStreams输入流以便创建多个receivers,而且你也不需要考虑每个receivers需要处理Kafka的分区数量。每个Kafka分区将会被自动地并行处理。而且,每个Kafka分区将会对应RDD中的分区,这使得并行模型变得更加简单。

  除了新的Streaming API,社区同时也引入了KafkaUtils.createRDD(),这个API可以对Kafka中的数据运行多个Job。

// Define the offset ranges to read in the batch job
val offsetRanges = Array(
  OffsetRange("some-topic", 0, 110, 220),
  OffsetRange("some-topic", 1, 100, 313),
  OffsetRange("another-topic", 0, 456, 789)
)

// Create the RDD based on the offset ranges
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, 
  StringDecoder](sparkContext, kafkaParams, offsetRanges)

Python 中的Kafka API

  在Spark 1.2中,基本的用于操作Spark Streaming中数据的Python API被引入,所以开发者可以使用纯Python来开发分布式地流处理应用程序。在Spark 1.3中,Spark Streaming流处理引入了Kafka操作API,有了这个API,在Python处理Kafka数据变得非常地容易。下面是简单的代码片段:

kafkaStream = KafkaUtils.createStream(streamingContext, 
  "www.iteblog.com:2181", "consumer-group", {"some-topic": 1})

lines = kafkaStream.map(lambda x: x[1])

  需要注意的是,这个实现还只是使用了旧的Kafka API,在Python中使用Direct API正在开发中,很有可能在Spark 1.4版本中可用。

  本文翻译自:https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming 1.3对Kafka整合的提升详解】(https://www.iteblog.com/archives/1307.html)
喜欢 (24)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!