下载并解压 spark-2.2.0-bin-hadoop2.7.tgz, 并且设置好 $SPARK_HOME 。
打包好 carbon jar, 并将 assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 文件复制到 $SPARK_HOME/jars 目录下。
mvn clean package -DskipTests -Pspark-2.2
在新的终端启动 spark-shell ,在里面输入 :paste, 将下面的代码复制到里面并运行:
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonStorePath
val warehouse = new File("./warehouse").getCanonicalPath
val metastore = new File("./metastore").getCanonicalPath
val spark = SparkSession
.builder()
.master("local")
.appName("preAggregateExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(warehouse, metastore)
spark.sparkContext.setLogLevel("ERROR")
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS sales")
// Create main table
spark.sql(
s"""
| CREATE TABLE sales (
| user_id string,
| country string,
| quantity int,
| price bigint)
| STORED BY 'carbondata'
""".stripMargin)
// Create pre-aggregate table on the main table
// If main table already have data, following command
// will trigger one immediate load to the pre-aggregate table
spark.sql(
s"""
| CREATE DATAMAP agg_sales
| ON TABLE sales
| USING "preaggregate"
| AS
| SELECT country, sum(quantity), avg(price)
| FROM sales
| GROUP BY country
""".stripMargin)
import spark.implicits._
import org.apache.spark.sql.SaveMode
import scala.util.Random
// Load data to the main table, it will also
// trigger immediate load to pre-aggregate table.
// These two loading operation is carried out in a
// transactional manner, meaning that the whole
// operation will fail if one of the loading fails
val r = new Random()
spark.sparkContext.parallelize(1 to 10)
.map(x => ("ID." + r.nextInt(100000), "country" + x % 8, x % 50, x % 60))
.toDF("user_id", "country", "quantity", "price")
.write
.format("carbondata")
.option("tableName", "sales")
.option("compress", "true")
.mode(SaveMode.Append)
.save()
spark.sql(
s"""
|SELECT country, sum(quantity), avg(price)
| from sales GROUP BY country
""".stripMargin).show
spark.stop
可以通过下面 DDL 创建 DataMap
CREATE DATAMAP [IF NOT EXISTS] datamap_name
ON TABLE main_table
USING "datamap_provider"
DMPROPERTIES ('key'='value', ...)
AS
SELECT statement
USING 之后的字符串称为 DataMap Provider,这个版本的 CarbonData 支持以下两种 DataMap:
可以使用下面 DDL 删除 DataMap
DROP DATAMAP [IF EXISTS] datamap_name
ON TABLE main_table
使用下面命令显示所有已经创建的 DataMaps:
SHOW DATAMAP
ON TABLE main_table
上面命令将显示出 main_table 上创建的所有 DataMaps。
预聚合表以 DataMaps 形式创建,在内部由 CarbonData 作为表进行管理。 只要存储要求和加载速度可以接受,用户可以创建许多预聚合 datamaps 以提高查询性能。
预聚合 datamaps 一旦被创建,CarbonData 的 SparkSQL 优化器将选择最有效的预聚合 datamaps,并重写 SQL 直接从已选定的 datamap 查询而不查询主表。 因为预聚合 datamaps 的数据大小比较小,所有用户的查询非常快。根据我们之前的经验,生产线上的 SQL 速度提高了 5 倍到 100 倍
比如,主表称为 sales ,它的定义如下:
CREATE TABLE sales (
order_time timestamp,
user_id string,
sex string,
country string,
quantity int,
price bigint)
STORED BY 'carbondata'
用户可以使用 Create DataMap DDL 来创建预聚合表:
CREATE DATAMAP agg_sales
ON TABLE sales
USING "preaggregate"
AS
SELECT country, sex, sum(quantity), avg(price)
FROM sales
GROUP BY country, sex
当用户提交查询时,在查询计划阶段,CarbonData 将根据关系代数(Relational Algebra)转换规则收集所有匹配的预聚合表作为候选者。然后,根据成本计算从候选表中选择用于此查询的最佳预聚合表。 为了简单起见,当前成本估算是基于预聚合表数据大小(我们认为在小表上查询速度会更快)。
对于上面创建的主表 sales 以及预聚合表 agg_sales ,下面查询
SELECT country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex
SELECT sex, sum(quantity) from sales GROUP BY sex
SELECT avg(price), country from sales GROUP BY country
将会被 CarbonData 的查询计划转换成查询预聚合表 agg_sales 而不是主表 sales
然而下面的查询
SELECT user_id, country, sex, sum(quantity), avg(price) from sales GROUP BY user_id, country, sex
SELECT sex, avg(quantity) from sales GROUP BY sex
SELECT country, max(price) from sales GROUP BY country
将只能在主表 sales 上查询,因为它不满足预聚合表的查询逻辑。
对已经加载数据的现有表,当用户创建预聚合表时,将数据加载到预聚合表由 CREATE DATAMAP 语句触发。 对于预聚合表创建之后增量数据的加载,在主表加载数据完成之后,主表会触发将数据加载到预聚合表中。
这些加载操作时事务性的,意味着只有所有表的数据加载成功,主表以及预聚合表的数据才对用户可见。如果其中一个加载出现失败,新的数据不会在所有的表中可见,就好像没有进行过加载操作一样。
作为查询加速技术,我们不能直接查询预聚合表,查询将在主表上进行。 在执行查询计划时, CarbonData 内部将检查与主表关联的预聚合表,并相应地执行查询计划转换。
用户可以通过执行 EXPLAIN
命令来验证查询是否可以利用预聚合表,这将显示转换的逻辑计划,因此用户可以检查是否选择了预聚合表。
在主表上运行压缩命令(ALTER TABLE COMPACT
)不会自动压缩在主表上创建的预聚合表。
用户需要在每个预聚合表上分别运行压缩命令来压缩它们。
预聚合表的压缩是可选的操作。如果在主表上执行压缩但未在预聚合表上执行压缩,所有的查询仍可受益于预聚合表。 为了进一步提高查询性能,对预聚合表的压缩会触发预聚合表中的段和文件的合并。
在当前实现中,主表和预聚合表都需要维护数据的一致性。在主表创建预聚合表之后,不支持在主表上执行以下命令:
UPDATE/DELETE/DELETE SEGMENT
.ALTER TABLE DROP COLUMN
, ALTER TABLE CHANGE DATATYPE
,
ALTER TABLE RENAME
. 注意,添加新列是支持的;而删除列或者更新列的数据类型,CarbonData 将会检测其是否会影响预聚合表,如果不会,则操作允许进行;否则操作将会被拒绝,并且会抛出异常。ALTER TABLE ADD/DROP PARTITION
但是,仍然有办法在主表上执行这些操作,在当前 CarbonData 版本中,用户可以执行如下操作:
DROP DATAMAP
命令删除预聚合表。CREATE DATAMAP
命令创建预聚合表。基本上,用户可以手动触发来重新构建 datamap。