Rheem

Rheem需要我们安装好Java8,然后根据自己的需求在pom.xml文件里面引入下面的依赖:

<dependency>
  <groupId>org.qcri.rheem</groupId>
  <artifactId>rheem-***</artifactId>
  <version>0.2.1</version>

</dependency>

注意上面的 ***,因为Rheem包含了很多个模块,我们需要根据自己的需求选择不同的模块,主要模块介绍如下:

  • rheem-core: 提供了核心数据结构和优化器,这个模块必须引入;
  • rheem-basic: 提供了通用的运算符和数据类型;
  • rheem-api: 提供了Java和Scala语言的API供大家使用;
  • rheem-java, rheem-spark, rheem-graphchi, rheem-sqlite3, rheem-postgres: 适用于各种平台的适配器
  • rheem-profiler: provides functionality to learn operator and UDF cost functions from historical execution data
  • 下面介绍如何使用RHEEM编写一个WordCount程序。这里以Scala API进行介绍:

    import org.qcri.rheem.api._
    import org.qcri.rheem.core.api.{Configuration, RheemContext}
    import org.qcri.rheem.java.Java
    import org.qcri.rheem.spark.Spark
    
    object WordcountScala {
      def main(args: Array[String]) {
    
        // Settings
        val inputUrl = "file:/tmp.txt"
    
        // Get a plan builder.
        val rheemContext = new RheemContext(new Configuration)
          .withPlugin(Java.basicPlugin)
          .withPlugin(Spark.basicPlugin)
        val planBuilder = new PlanBuilder(rheemContext)
          .withJobName(s"WordCount ($inputUrl)")
          .withUdfJarsOf(this.getClass)
    
        val wordcounts = planBuilder
          // Read the text file.
          .readTextFile(inputUrl).withName("Load file")
    
          // Split each line by non-word characters.
          .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")
    
          // Filter empty tokens.
          .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
    
          // Attach counter to each word.
          .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
    
          // Sum up counters for every word.
          .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
          .withCardinalityEstimator((in: Long) => math.round(in * 0.01))
    
          // Execute the plan and collect the results.
          .collect()
    
        println(wordcounts)
      }
    }
    

    从上面的代码可以看出,这个代码的函数和处理过程和使用Spark或者Flink开发程序流程很类似,然后我们可以使用下面命令运行这个程序:

    java  com.iteblog.WordcountScala
    

    然后就可以在Spark上运行这个程序。更多关于RHEEM的介绍可以参见期官方文档介绍:https://github.com/daqcri/rheem

    本博客文章除特别声明,全部都是原创!
    原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
    本文链接: 【Rheem:可扩展且易于使用的跨平台大数据分析系统】(https://www.iteblog.com/archives/2087.html)
    喜欢 (3)
    分享 (0)
    发表我的评论
    取消评论

    表情
    本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!