Rheem
Rheem需要我们安装好Java8,然后根据自己的需求在pom.xml文件里面引入下面的依赖:
<dependency> <groupId>org.qcri.rheem</groupId> <artifactId>rheem-***</artifactId> <version>0.2.1</version> </dependency>
注意上面的 ***
,因为Rheem包含了很多个模块,我们需要根据自己的需求选择不同的模块,主要模块介绍如下:
rheem-core
: 提供了核心数据结构和优化器,这个模块必须引入;rheem-basic
: 提供了通用的运算符和数据类型;rheem-api
: 提供了Java和Scala语言的API供大家使用;rheem-java
, rheem-spark
, rheem-graphchi
, rheem-sqlite3
, rheem-postgres
: 适用于各种平台的适配器rheem-profiler
: provides functionality to learn operator and UDF cost functions from historical execution data下面介绍如何使用RHEEM编写一个WordCount程序。这里以Scala API进行介绍:
import org.qcri.rheem.api._ import org.qcri.rheem.core.api.{Configuration, RheemContext} import org.qcri.rheem.java.Java import org.qcri.rheem.spark.Spark object WordcountScala { def main(args: Array[String]) { // Settings val inputUrl = "file:/tmp.txt" // Get a plan builder. val rheemContext = new RheemContext(new Configuration) .withPlugin(Java.basicPlugin) .withPlugin(Spark.basicPlugin) val planBuilder = new PlanBuilder(rheemContext) .withJobName(s"WordCount ($inputUrl)") .withUdfJarsOf(this.getClass) val wordcounts = planBuilder // Read the text file. .readTextFile(inputUrl).withName("Load file") // Split each line by non-word characters. .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words") // Filter empty tokens. .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words") // Attach counter to each word. .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters") .withCardinalityEstimator((in: Long) => math.round(in * 0.01)) // Execute the plan and collect the results. .collect() println(wordcounts) } }
从上面的代码可以看出,这个代码的函数和处理过程和使用Spark或者Flink开发程序流程很类似,然后我们可以使用下面命令运行这个程序:
java com.iteblog.WordcountScala
然后就可以在Spark上运行这个程序。更多关于RHEEM的介绍可以参见期官方文档介绍:https://github.com/daqcri/rheem
本博客文章除特别声明,全部都是原创!