select count(*), host, module from ewaplog group by host, module order by host, module;

下面我们先来看看Spark SQL核心的代码(关于Spark SQL的详细介绍请参见Spark官方文档,这里我就不介绍了。):

/**
 * User: 过往记忆
 * Date: 14-8-13
 * Time: 下午23:16
 * bolg: 
 * 本文地址:/archives/1090
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

public static class Entry implements Serializable {
        private String host;
        private String module;
        private String content;

        public Entry(String host, String module, String content) {
            this.host = host;
            this.module = module;
            this.content = content;
        }

        public String getHost() {
            return host;
        }

        public void setHost(String host) {
            this.host = host;
        }

        public String getModule() {
            return module;
        }

        public void setModule(String module) {
            this.module = module;
        }

        public String getContent() {
            return content;
        }

        public void setContent(String content) {
            this.content = content;
        }

        @Override
        public String toString() {
            return "[" + host + "\t" + module + "\t" + content + "]";
        }
}

.......

JavaSparkContext ctx = ...
JavaSQLContext sqlCtx = ...
JavaRDD<Entry> stringJavaRDD = ctx.textFile(args[0]).map(
      new Function<String, Entry>() {
            @Override
            public Entry call(String str) throws Exception {
                String[] split = str.split("\u0001");
                if (split.length < 3) {
                    return new Entry("", "", "");
                }

                return new Entry(split[0], split[1], split[2]);
            }
});

JavaSchemaRDD schemaPeople = sqlCtx.applySchema(stringJavaRDD, Entry.class);
schemaPeople.registerAsTable("entry");
JavaSchemaRDD teenagers = sqlCtx.sql("select count(*), host, module " +
                "from entry " +
                "group by host, module " +
                "order by host, module");

List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
     public String call(Row row) {
          return row.getLong(0) + "\t" + 
                  row.getString(1) + "\t" + row.getString(2);
     }
}).collect();

for (String name : teenagerNames) {
            System.out.println(name);
}

Spark Hive核心代码:

/**
 * User: 过往记忆
 * Date: 14-8-23
 * Time: 下午23:16
 * bolg: 
 * 本文地址:/archives/1090
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
JavaHiveContext hiveContext =....;
JavaSchemaRDD result = hiveContext.hql("select count(*), host, module " +
                "from ewaplog " +
                "group by host, module " +
                "order by host, module");
List<Row> collect = result.collect();
for (Row row : collect) {
    System.out.println(row.get(0) + "\t" + row.get(1) + "\t" + row.get(2));
}

  大家可以看到Spark Hive核心代码里面的SQL语句和直接在Hive上面执行一样,在执行这个代码的时候,需要确保ewaplog存在。而且在运行这个程序的时候需要依赖Hive的一些jar包,需要依赖Hive的元数据等信息。对Hive的依赖比较大。而Spark SQL直接读取lzo文件,并没有涉及到Hive,相比Spark Hive依赖性这方便很好。Spark SQL直接读取lzo文件,然后将数据存放在RDD中,applySchema方法将JavaRDD转换成JavaSchemaRDD,我们来看看文档是怎么来描述的

  At the core of this component is a new type of RDD, SchemaRDD. SchemaRDDs are composed Row objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.

转换成JavaSchemaRDD之后,我们可以用用registerAsTable将它注册到表中,之后就可以通过JavaSQLContext的sql方法来执行相应的sql语句了。
  用Maven编译完上面的程序之后,放到Hadoop集群上面运行:

iteblog@Spark $ spark-submit --master yarn-cluster  
                             --jars lib/spark-sql_2.10-1.0.0.jar
                             --class SparkSQLTest
                             --queue queue1
                             ./spark-1.0-SNAPSHOT.jar 
                             /home/wyp/test/*.lzo

分别经过了20分钟左右的时间,Spark SQL和Spark Hive都可以运行完,结果如下:

39511517	bokingserver1	CN1_hbase_android_client
59141803	bokingserver1	CN1_hbase_iphone_client
39544052	bokingserver2	CN1_hbase_android_client
59156743	bokingserver2	CN1_hbase_iphone_client
23472413	bokingserver3	CN1_hbase_android_client
35251936	bokingserver3	CN1_hbase_iphone_client
23457708	bokingserver4	CN1_hbase_android_client
35262400	bokingserver4	CN1_hbase_iphone_client
19832715	bokingserver5	CN1_hbase_android_client
51003885	bokingserver5	CN1_hbase_iphone_client
19831076	bokingserver6	CN1_hbase_android_client
50997314	bokingserver6	CN1_hbase_iphone_client
30526207	bokingserver7	CN1_hbase_android_client
50702806	bokingserver7	CN1_hbase_iphone_client
54844214	bokingserver8	CN1_hbase_android_client
88062792	bokingserver8	CN1_hbase_iphone_client
54852596	bokingserver9	CN1_hbase_android_client
88043401	bokingserver9	CN1_hbase_iphone_client
54864322	bokingserver10	CN1_hbase_android_client
88041583	bokingserver10	CN1_hbase_iphone_client
54891529	bokingserver11	CN1_hbase_android_client
88007489	bokingserver11	CN1_hbase_iphone_client
54613917	bokingserver12	CN1_hbase_android_client
87623763	bokingserver12	CN1_hbase_iphone_client

  为了比较基于Spark的任务确实比基于Mapreduce的快,我特意用Hive执行了同样的任务,如下:

hive> select count(*), host, module from ewaplog
    > group by host, module order by host, module;

Job 0: Map: 2845  Reduce: 364   Cumulative CPU: 17144.59 sec
HDFS Read: 363542156311 HDFS Write: 36516 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 4.82 sec
HDFS Read: 114193 HDFS Write: 1260 SUCCESS
Total MapReduce CPU Time Spent: 0 days 4 hours 45 minutes 49 seconds 410 msec
OK
39511517	bokingserver1	CN1_hbase_android_client
59141803	bokingserver1	CN1_hbase_iphone_client
39544052	bokingserver2	CN1_hbase_android_client
59156743	bokingserver2	CN1_hbase_iphone_client
23472413	bokingserver3	CN1_hbase_android_client
35251936	bokingserver3	CN1_hbase_iphone_client
23457708	bokingserver4	CN1_hbase_android_client
35262400	bokingserver4	CN1_hbase_iphone_client
19832715	bokingserver5	CN1_hbase_android_client
51003885	bokingserver5	CN1_hbase_iphone_client
19831076	bokingserver6	CN1_hbase_android_client
50997314	bokingserver6	CN1_hbase_iphone_client
30526207	bokingserver7	CN1_hbase_android_client
50702806	bokingserver7	CN1_hbase_iphone_client
54844214	bokingserver8	CN1_hbase_android_client
88062792	bokingserver8	CN1_hbase_iphone_client
54852596	bokingserver9	CN1_hbase_android_client
88043401	bokingserver9	CN1_hbase_iphone_client
54864322	bokingserver10	CN1_hbase_android_client
88041583	bokingserver10	CN1_hbase_iphone_client
54891529	bokingserver11	CN1_hbase_android_client
88007489	bokingserver11	CN1_hbase_iphone_client
54613917	bokingserver12	CN1_hbase_android_client
87623763	bokingserver12	CN1_hbase_iphone_client
Time taken: 1818.706 seconds, Fetched: 24 row(s)

  从上面的显示我们可以看出,Hive执行同样的任务用了30分钟,而Spark用了20分钟,也就是省了1/3的时间,还是很快的。在运行的过程中,我发现Spark消耗内存比较大,在程序运行期间,三个子节点负载很高,整个队列的资源消耗了一半以上。我想如果集群的机器数据更多的话,Spark的运行速度应该还会有一些提升。好了今天就说到这,欢迎关注本博客。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark SQL & Spark Hive编程开发, 并和Hive执行效率对比】(https://www.iteblog.com/archives/1090.html)
喜欢 (51)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(9)个小伙伴在吐槽
  1. 我能找到HiveContext但是找不到这里的JavaHiveContext.请问这个需要依赖什么jar包吗?

    给Zji^_^2016-04-16 11:55 回复
  2. 从Spark 1.3.0版本开始,JavaHiveConcext.scala文件已经被移除了。只能用HiveConcext。

    w3970907702016-04-18 16:20 回复
  • spark hive 的操作,对于hive的依赖包,怎么弄得? JavaHiveContext ,JavaSchemaRDD 我都没,求指教怎么弄依赖包。
    在线等。谢谢

    奥啦2015-07-09 18:06 回复
  • 你是单独建立工程还是直接在shell上面运行的?

    w3970907702015-07-09 18:23 回复
  • 单独建立工程。

    奥啦2015-07-09 18:56 回复
  • "org.apache.spark" % "spark-sql_2.10" % "1.2.0"
    "org.apache.spark" % "spark-hive_2.10" % "1.2.0"
    


    导入上面的两个jar包试试。

    w3970907702015-07-09 19:27 回复
  • 有用,非常感谢

    奥啦2015-07-09 22:46
  • 这点提升感觉对于离线数据处理意义不是太大,特别再考虑到对内存的消耗,感觉从稳定性上考虑,这个代价不是很值得

    sky880882014-09-28 11:01 回复
  • 非常感谢

    asen2014-09-09 17:09 回复