[/code]
下面就是Spark Scala REPL shell的简单实例:
scala> val hamlet = sc.textFile("~/temp/gutenburg.txt") hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
在上面的代码中,我们读取了文件,并创建了一个String类型的RDD,每一个String代表文件中的每一行。
scala> val topWordCount = hamlet.flatMap(str=>str.split(" ")) .filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_) .map{case (word, count) => (count, word)}.sortByKey(false) topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14
1、通过上述命令我们可以发现这个操作非常简单——通过简单的Scala API来连接transformations和actions。
2、可能存在某些words被1个以上空格分隔的情况,导致有些words是空字符串,因此需要使用filter(!_.isEmpty)将它们过滤掉。
3、每个word都被映射成一个键值对:map(word=>(word,1))。
4、为了合计所有计数,这里需要调用一个reduce步骤——reduceByKey(_+_)。 _+_ 可以非常便捷地为每个key赋值。
5、我们得到了words以及各自的counts,下一步需要做的是根据counts排序。在Apache Spark,用户只能根据key排序,而不是值。因此,这里需要使用map{case (word, count) => (count, word)}将(word, count)流转到(count, word)。
6、需要计算最常用的5个words,因此需要使用sortByKey(false)做一个计数的递减排序。
scala> topWordCount.take(5).foreach(x=>println(x)) (1044,the) (730,and) (679,of) (648,to) (511,I)
上述命令包含了一个.take(5) (an action operation, which triggers computation)和在 ~/temp/gutenburg.txt文本中输出10个最常用的words。在Python shell中用户可以实现同样的功能。
RDD lineage可以通过toDebugString(一个值得记住的操作)来跟踪:
scala> topWordCount.toDebugString res8: String = MapPartitionsRDD[19] at sortByKey at <console>:14 ShuffledRDD[18] at sortByKey at <console>:14 MappedRDD[17] at map at <console>:14 MapPartitionsRDD[16] at reduceByKey at <console>:14 ShuffledRDD[15] at reduceByKey at <console>:14 MapPartitionsRDD[14] at reduceByKey at <console>:14 MappedRDD[13] at map at <console>:14 FilteredRDD[12] at filter at <console>:14 FlatMappedRDD[11] at flatMap at <console>:14 MappedRDD[1] at textFile at <console>:12 HadoopRDD[0] at textFile at <console>:12
scala> val rdd = sc.parallelize(List("ABC","BCD","DEF")) scala> val filtered = rdd.filter(_.contains("C")) scala> filtered.collect()
Result:
Array[String] = Array(ABC, BCD)
scala> val rdd=sc.parallelize(List(1,2,3,4,5)) scala> val times2 = rdd.map(_*2) scala> times2.collect()
Result:
Array[Int] = Array(2, 4, 6, 8, 10)
scala> val rdd=sc.parallelize(List("Spark is awesome","It is fun")) scala> val fm=rdd.flatMap(str=>str.split(" ")) scala> fm.collect()
Result:
Array[String] = Array(Spark, is, awesome, It, is, fun)
scala> val word1=fm.map(word=>(word,1)) scala> val wrdCnt=word1.reduceByKey(_+_) scala> wrdCnt.collect()
Result:
Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))
scala> val cntWrd = wrdCnt.map{case (word, count) => (count, word)} scala> cntWrd.groupByKey().collect()
Result:
Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark,
fun)), (2,ArrayBuffer(is)))
scala> fm.distinct().collect()
Result:
Array[String] = Array(is, It, awesome, Spark, fun)
scala> val rdd1=sc.parallelize(List(‘A’,’B’)) scala> val rdd2=sc.parallelize(List(‘B’,’C’)) scala> rdd1.union(rdd2).collect()
Result:
Array[Char] = Array(A, B, B, C)
scala> rdd1.intersection(rdd2).collect()
Result:
Array[Char] = Array(B)
scala> rdd1.cartesian(rdd2).collect()
Result:
Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C))
scala> rdd1.subtract(rdd2).collect()
Result:
Array[Char] = Array(A)
scala> val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot"))) scala> val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista"))) scala> personFruit.join(personSE).collect()
Result:
Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)),
(Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
scala> personFruit.cogroup(personSe).collect()
Result:
Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple,
Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))),
(Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista))))
scala> val rdd = sc.parallelize(list('A','B','c')) scala> rdd.count()
Result:
long = 3
scala> val rdd = sc.parallelize(list('A','B','c')) scala> rdd.collect()
Result:
Array[char] = Array(A, B, c)
scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.reduce(_+_)
Result:
Int = 10
scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.take(2)
Result:
Array[Int] = Array(1, 2)
scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.foreach(x=>println("%s*10=%s". format(x,x*10)))
Result:
1*10=10 4*10=40 3*10=30 2*10=20
scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.first()
Result:
Int = 1
scala> val hamlet = sc.textFile("~/temp/gutenburg.txt") scala> hamlet.filter(_.contains("Shakespeare")).saveAsTextFile("~/temp/filtered")
Result:
akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001
Apache Spark中一个主要的能力就是在集群内存中持久化/缓存RDD。这将显著地提升交互速度。下表显示了Spark中各种选项:
上面的存储等级可以通过RDD. cache()操作上的 persist()操作访问,可以方便地指定MEMORY_ONLY选项。关于持久化等级的更多信息,可以访问这里http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。
Spark使用Least Recently Used (LRU)算法来移除缓存中旧的、不常用的RDD,从而释放出更多可用内存。同样还提供了一个unpersist() 操作来强制移除缓存/持久化的RDD。
Spark提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题——Accumulators。Accumulators在一个Spark context中通过默认值初始化,这些计数器在Slaves节点上可用,但是Slaves节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到Master。Master是唯一可以读取和计算所有更新合集的节点。举个例子:
test@localhost~/temp$ cat output.log error warning info trace error info info scala> val nErrors=sc.accumulator(0.0) scala> val logs = sc.textFile("/Users/akuntamukkala/temp/output.log") scala> logs.filter(_.contains("error")).foreach(x=>nErrors+=1) scala> nErrors.value Result:Int = 2
实际生产中,通过指定key在RDDs上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给slave nodes发送大体积数据集的情况,让其负责托管需要做join的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络IO比内存访问速度慢100倍。为了解决这个问题,Spark提供了Broadcast Variables,如其名称一样,它会向slave nodes进行广播。因此,节点上的RDD操作可以快速访问Broadcast Variables值。举个例子,期望计算一个文件中所有路线项的运输成本。通过一个look-up table指定每种运输类型的成本,这个look-up table就可以作为Broadcast Variables。
test@localhost~/temp$ cat packagesToShip.txt ground express media priority priority ground express media scala> val map = sc.parallelize(Seq(("ground",1),("med",2), ("priority",5),("express",10))).collect().toMap map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10) scala> val bcMailRates = sc.broadcast(map)
上述命令中,我们建立了一个broadcast variable,基于服务类别成本的map。
scala> val pts = sc.textFile("~/temp/packagesToShip.txt") scala> pts.map(shipType=>(shipType,1)).reduceByKey(_+_). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()
在上述命令中,我们通过broadcast variable的mailing rates来计算运输成本。
Array[(String, Int)] = Array((priority,10), (express,20), (media,4), (ground,2)) scala> val shippingCost=sc.accumulator(0.0) scala> pts.map(x=>(x,1)).reduceByKey(_+_).map{case (x,y)=>(x,y*bcMailRates.value(x))}.foreach(v=>shippingCost+=v._2) scala> shippingCost.value Result: Double = 36.0
通过上述命令,我们使用accumulator来累加所有运输的成本。详细信息可通过下面的PDF查看http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。
通过Spark Engine,Spark SQL提供了一个便捷的途径来进行交互式分析,使用一个被称为SchemaRDD类型的RDD。SchemaRDD可以通过已有RDDs建立,或者其他外部数据格式,比如Parquet files、JSON数据,或者在Hive上运行HQL。SchemaRDD非常类似于RDBMS中的表格。一旦数据被导入SchemaRDD,Spark引擎就可以对它进行批或流处理。Spark SQL提供了两种类型的Contexts——SQLContext和HiveContext,扩展了SparkContext的功能。
SparkContext提供了到简单SQL parser的访问,而HiveContext则提供了到HiveQL parser的访问。HiveContext允许企业利用已有的Hive基础设施。
这里看一个简单的SQLContext示例,下面文本中的用户数据通过"|"来分割。
John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854
定义Scala case class来表示每一行:
case class Customer(name:String,age:Int,gender:String,address: String)
下面的代码片段体现了如何使用SparkContext来建立SQLContext,读取输入文件,将每一行都转换成SparkContext中的一条记录,并通过简单的SQL语句来查询30岁以下的男性用户。
val sparkConf = new SparkConf().setAppName("Customers") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val r = sc.textFile("/Users/akuntamukkala/temp/customers.txt") val records = r.map(_.split('|')) val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable("customers") sqlContext.sql("select * from customers where gender='M' and age < 30").collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris, TX,75461]
更多使用SQL和HiveQL的示例请访问下面链接https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。
Spark Streaming提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了Spark的简易编程模型。从真正意义上讲,Spark Streaming会将流数据转换成micro batches,从而将Spark批处理编程模型应用到流用例中。这种统一的编程模型让Spark可以很好地整合批量处理和交互式流分析。下图显示了Spark Streaming可以从不同数据源中读取数据进行分析。
Spark Streaming中的核心抽象是Discretized Stream(DStream)。DStream由一组RDD组成,每个RDD都包含了规定时间(可配置)流入的数据。图12很好地展示了Spark Streaming如何通过将流入数据转换成一系列的RDDs,再转换成DStream。每个RDD都包含两秒(设定的区间长度)的数据。在Spark Streaming中,最小长度可以设置为0.5秒,因此处理延时可以达到1秒以下。
Spark Streaming同样提供了 window operators,它有助于更有效率在一组RDD( a rolling window of time)上进行计算。同时,DStream还提供了一个API,其操作符(transformations和output operators)可以帮助用户直接操作RDD。下面不妨看向包含在Spark Streaming下载中的一个简单示例。示例是在Twitter流中找出趋势hashtags,详见下面代码:
spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala val sparkConf = new SparkConf().setAppName("TwitterPopularTags") val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters)
上述代码用于建立Spark Streaming Context。Spark Streaming将在DStream中建立一个RDD,包含了每2秒流入的tweets。
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
上述代码片段将Tweet转换成一组words,并过滤出所有以a#开头的。
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))
上述代码展示了如何整合计算60秒内一个hashtag流入的总次数。
topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} })
上面代码将找出top 10趋势tweets,然后将其打印。
ssc.start()
上述代码让Spark Streaming Context 开始检索tweets。一起聚焦一些常用操作,假设我们正在从一个socket中读入流文本。
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
brandNames.txt
coke
nike
sprite
reebok
This helps in computing a running aggregate of total number of times a word has occurred
更多operators请访问http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
Spark Streaming拥有大量强大的output operators,比如上文提到的 foreachRDD(),了解更多可访问 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。
Wikipedia article (good): http://en.wikipedia.org/wiki/Apache_Spark
Launching a Spark cluster on EC2: http://ampcamp.berkeley.edu/exercises-strata-conf-2013/launching-a-cluster.html
Quick start: https://spark.apache.org/docs/1.0.1/quick-start.html
The Spark platform provides MLLib(machine learning) and GraphX(graph algorithms). The following links provide more information:https://spark.apache.org/docs/latest/mllib-guide.html、https://spark.apache.org/docs/1.0.1/graphx-programming-guide.html、https://dzone.com/refcardz/apache-spark