由于数据类型不兼容、数据为空或者格式不兼容而导致无法加载到 CarbonData 中的记录归类为坏记录。
坏记录存储在 carbon.properties 文件 carbon.badRecords.location 属性设置的位置。默认情况,carbon.badRecords.location 设置的路径为 /opt/Carbon/Spark/badrecords
在加载数据时,我们可以指定处理坏记录的方法。如果想分析坏记录产生的原因,BAD_RECORDS_LOGGER_ENABLE
参数必须设置成 TRUE
。有很多方式来处理坏记录,我们可以通过设置 BAD_RECORDS_ACTION
参数。
'BAD_RECORDS_ACTION'='FORCE'
'BAD_RECORDS_ACTION'='REDIRECT'
如果想忽略将坏记录存储在 csv 文件中,请在查询中设置一下参数:
'BAD_RECORDS_ACTION'='IGNORE'
创建 carbon session 时指定的存储路径,在 CarbonData 中主要用于存储元数据,比如模式、字典文件,字典元数据以及排序索引。
尝试使用以下方式来在创建 carbonsession
的时候指定 storepath
:
val carbon = SparkSession.builder().config(sc.getConf)
.getOrCreateCarbonSession(<store_path>)
示例:
val carbon = SparkSession.builder().config(sc.getConf)
.getOrCreateCarbonSession("hdfs://localhost:9000/carbon/store")
Apache CarbonData 获取文件锁用于防止并发修改相同文件的操作。锁的类型取决于存储路径,对于 HDFS 路径,锁的类型为 HDFSLOCK。默认情况下,锁类型为 LOCALLOCK。 carbon.lock.type 属性指定在表上执行并发操作期间要获取锁的类型。 该属性可以使用以下值进行设置:
为了构建 CarbonData 工程,需要我们指定 spark 相关配置。spark 配置指定 Spark 的版本。当使用 Maven 构建工程时,你需要指定 spark version
。
Carbon 支持插入操作,你可以参考 CarbonData 上的 DML 操作 中提到的语法。首先,在 spark-sql 中创建一张源表,并将数据加载到已经创建好的的表中。
CREATE TABLE source_table(
id String,
name String,
city String)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";
SELECT * FROM source_table;
id name city
1 jack beijing
2 erlu hangzhou
3 davi shenzhen
场景 1 :
假设 carbon 表列的顺序与源表不同,使用语句 "SELECT * FROM carbon table" 进行查询时,将获得与源表一样的列顺序,而不是预期的 carbon 表列顺序。
CREATE TABLE IF NOT EXISTS carbon_table(
id String,
city String,
name String)
STORED BY 'carbondata';
INSERT INTO TABLE carbon_table SELECT * FROM source_table;
SELECT * FROM carbon_table;
id city name
1 jack beijing
2 erlu hangzhou
3 davi shenzhen
正如结果显示的,carbon 表的第二列是 city,但是查出来的是 name,比如 jack。这种现象和将数据插入 hive 表中一样。
如果你想将数据插入到 carbon 表对应的列中,则必须在插入语句中指定好列的顺序。
INSERT INTO TABLE carbon_table SELECT id, city, name FROM source_table;
场景 2 :
当 carbon 表列数与 select 语句中指定的列数不相同时,插入操作将失败。下面的插入操作将会失败。
INSERT INTO TABLE carbon_table SELECT id, city FROM source_table;
场景 3 :
当 carbon 表列类型与 select 语句中指定的列类型不相同时,插入操作仍然会成功,但是结果可能为 NULL,因为在转换类型失败时,将使用 NULL 替代。
以下聚合查询不会从聚合表中获取数据:
示例:
create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata';
create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp21 group by cntry;
select ctry from pop1 where ctry in (select cntry from gdp21 group by cntry);
示例:
create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata';
create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp21 group by cntry;
select cntry, sum(gdp) from gdp21 where cntry in (select ctry from pop1) group by cntry;
示例:
create table gdp21(cntry smallint, gdp double, y_year date) stored by 'carbondata';
create datamap ag1 on table gdp21 using 'preaggregate' as select cntry, sum(gdp) from gdp21 group by cntry;
select cntry,sum(gdp) from gdp21,pop1 where cntry=ctry group by cntry;
Spark executor 在重试次数达到最大值后才显示任务失败,但加载具有坏记录的数据并将 BAD_RECORDS_ACTION(carbon.bad.records.action)设置为 FAIL 将只尝试一次,并且将给驱动程序发送失败信号,而不是抛出异常来重试,因为如果发现坏记录并且 BAD_RECORDS_ACTION 设置为 FAIL,则无法重试。因此,Spark executor 显示这一尝试是成功的状态,但该命令实际上却执行失败了。 我们可以检查任务尝试或执行程序日志以观察失败的原因。
SDK writer 是一个独立的实体,因此 SDK writer 可以从具有不同时区的非群集机器生成 carbondata 文件。 但是在群集中读取这些文件时,它总是获取到群集时区。 因此,时间戳和日期数据类型字段的值不是原始值。 如果想要在写入时控制数据的时区,则通过调用以下 API 在 SDK writer 中设置集群的时区。
TimeZone.setDefault(timezoneValue)
示例:
cluster timezone is Asia/Shanghai
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))