设计
Hadoop 2.5-2.7 HBase 0.98.6 Spark 2.0.0-2.1.1 Sqoop 1.4.6
本段的重点在于讨论 HBase 表的设计,其中 RowKey 是最重要的部分。为了方便说明问题,我们先来看看数据格式。以下以 check 举例,opinion 同理。
check 表(原表字段有 18 个,为方便描述,本文截选 5 个字段示意)
如上图所示,主键为 id,32 位字母和数字随机组成,业务查询字段 check_id 为不定长字段(不超过 32 位),字母和数字组成,同一 check_id 可能对应多条记录,其他为相关业务字段。众所周知,HBase 是基于 RowKey 提供查询,且要求 RowKey 是唯一的。RowKey 的设计主要考虑的是数据将怎样被访问。初步来看,我们有 2 种设计方法。
第一种方法优点是表结构简单,RowKey 容易设计,缺点为 1)数据写入时,一行原始数据需要写入到 2 张表,且索引表写入前需要先扫描该 RowKey 是否存在,如果存在,则加入一列,否则新建一行,2)读取的时候,即便是采用 List, 也至少需要读取 2 次表。第二种设计方法,RowKey 设计较为复杂,但是写入和读取都是一次性的。综合考虑,我们采用第二种设计方法。
HBase 中的行是以 RowKey 的字典序排序的,其热点问题通常发生在大量的客户端直接访问集群的一个或极少数节点。默认情况下,在开始建表时,表只会有一个 region,并随着 region 增大而拆分成更多的 region,这些 region 才能分布在多个 regionserver 上从而使负载均分。对于我们的业务需求,存量数据已经较大,因此有必要在一开始就将 HBase 的负载均摊到每个 regionserver,即做 pre-split。常见的防治热点的方法为加盐,hash 散列,自增部分(如时间戳)翻转等。
Step1:确定预分区数目,创建 HBase Table
不同的业务场景及数据特点确定数目的方式不一样,我个人认为应该综合考虑数据量大小和集群大小等因素。比如 check 表大小约为 11G,测试集群大小为 10 台机器,hbase.hregion.max.filesize=3G(当 region 的大小超过这个数时,将拆分为 2 个),所以初始化时尽量使得一个 region 的大小为 1~2G(不会一上来就 split),region 数据分到 11G/2G=6 个,但为了充分利用集群资源,本文中 check 表划分为 10 个分区。如果数据量为 100G,且不断增长,集群情况不变,则 region 数目增大到 100G/2G=50 个左右较合适。Hbase check 表建表语句如下:
create 'tinawang:check', { NAME => 'f', COMPRESSION => 'SNAPPY',DATA_BLOCK_ENCODING => 'FAST_DIFF',BLOOMFILTER=>'ROW'}, {SPLITS => [ '1','2','3', '4','5','6','7','8','9']}
其中,Column Family =‘f’,越短越好。
COMPRESSION => 'SNAPPY',HBase 支持 3 种压缩 LZO, GZIP and Snappy。GZIP 压缩率高,但是耗 CPU。后两者差不多,Snappy 稍微胜出一点,cpu 消耗的比 GZIP 少。一般在 IO 和 CPU 均衡下,选择 Snappy。
DATA_BLOCK_ENCODING => 'FAST_DIFF',本案例中 RowKey 较为接近,通过以下命令查看 key 长度相对 value 较长。
./hbase org.apache.hadoop.hbase.io.hfile.HFile -m -f /apps/hbase/data/data/tinawang/check/a661f0f95598662a53b3d8b1ae469fdf/f/a5fefc880f87492d908672e1634f2eed_SeqId_2_
Step2:RowKey 组成
Salt
让数据均衡的分布到各个 Region 上,结合 pre-split,我们对查询键即 check 表的 check_id 求 hashcode 值,然后 modulus(numRegions) 作为前缀,注意补齐数据。
StringUtils.leftPad(Integer.toString(Math.abs(check_id.hashCode() % numRegion)),1,’0’)
说明:如果数据量达上百 G 以上,则 numRegions 自然到 2 位数,则 salt 也为 2 位。
Hash 散列
因为 check_id 本身是不定长的字符数字串,为使数据散列化,方便 RowKey 查询和比较,我们对 check_id 采用 SHA1 散列化,并使之 32 位定长化。
MD5Hash.getMD5AsHex(Bytes.toBytes(check_id))
唯一性
以上 salt+hash 作为 RowKey 前缀,加上 check 表的主键 id 来保障 RowKey 唯一性。综上,check 表的 RowKey 设计如下:(check_id=A208849559)
为增强可读性,中间还可以加上自定义的分割符,如’+’,’|’等。
7+7c9498b4a83974da56b252122b9752bf+56B63AB98C2E00B4E053C501380709AD
以上设计能保证对每次查询而言,其 salt+hash 前缀值是确定的,并且落在同一个 region 中。需要说明的是 HBase 中 check 表的各列同数据源 Oracle 中 check 表的各列存储。
RowKey 设计与查询息息相关,查询方式决定 RowKey 设计,反之基于以上 RowKey 设计,查询时通过设置 Scan 的 [startRow,stopRow], 即可完成扫描。以查询 check_id=A208849559 为例,根据 RowKey 的设计原则,对其进行 salt+hash 计算,得前缀。
startRow = 7+7c9498b4a83974da56b252122b9752bf stopRow = 7+7c9498b4a83974da56b252122b9752bg
Step0: prepare work
因为是从上游系统承接的业务数据,存量数据采用 sqoop 抽到 hdfs;增量数据每日以文件的形式从 ftp 站点获取。因为业务数据字段中包含一些换行符,且 sqoop1.4.6 目前只支持单字节,所以本文选择’0x01’作为列分隔符,’0x10’作为行分隔符。
Step1: Spark read hdfs text file
SparkContext.textfile() 默认行分隔符为”\n”,此处我们用“0x10”,需要在 Configuration 中配置。应用配置,我们调用 newAPIHadoopFile 方法来读取 hdfs 文件,返回 JavaPairRDD,其中 LongWritable 和 Text 分别为 Hadoop 中的 Long 类型和 String 类型(所有 Hadoop 数据类型和 java 的数据类型都很相像,除了它们是针对网络序列化而做的特殊优化)。我们需要的数据文件放在 pairRDD 的 value 中,即 Text 指代。为后续处理方便,可将 JavaPairRDD 转换为 JavaRDD< String >。
Step2: Transfer and sort RDD
① 将 avaRDD< String> 转换成 JavaPairRDD
② 基于 JavaPairRDD
java.io.IOException: Added a key notlexically larger than previous key
③ 将数据组织成 HFile 要求的 JavaPairRDDhfileRDD。
Step3:create hfile and bulk load to HBase
①主要调用 saveAsNewAPIHadoopFile 方法:
hfileRdd.saveAsNewAPIHadoopFile(hfilePath,ImmutableBytesWritable.class, KeyValue.class,HFileOutputFormat2.class,config);
② hfilebulk load to HBase
final Job job = Job.getInstance(); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); HFileOutputFormat2.configureIncrementalLoad(job,htable); LoadIncrementalHFiles bulkLoader = newLoadIncrementalHFiles(config); bulkLoader.doBulkLoad(newPath(hfilePath),htable);
注:如果集群开启了 kerberos,step4 需要放置在 ugi.doAs()方法中,在进行如下验证后实现
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(keyUser,keytabPath); UserGroupInformation.setLoginUser(ugi);
访问 HBase 集群的 60010 端口 web,可以看到 region 分布情况。
本文基于 spring boot 框架来开发 web 端访问 HBase 内数据。
创建连接是一个比较重的操作,在实际 HBase 工程中,我们引入连接池来共享 zk 连接,meta 信息缓存,region server 和 master 的连接。
HConnection connection = HConnectionManager.createConnection(config); HTableInterface table = connection.getTable("table1"); try { // Use the table as needed, for a single operation and a single thread } finally { table.close(); }
也可以通过以下方法,覆盖默认线程池。
HConnection createConnection(Configuration conf,ExecutorService pool);
Step1: 根据查询条件,确定 RowKey 前缀
根据 3.3 RowKey 设计介绍,HBase 的写和读都遵循该设计规则。此处我们采用相同的方法,将 web 调用方传入的查询条件,转化成对应的 RowKey 前缀。例如,查询 check 表传递过来的 check_id=A208849559,生成前缀 7+7c9498b4a83974da56b252122b9752bf。
Step2:确定 scan 范围
A208849559 对应的查询结果数据即在 RowKey 前缀为 7+7c9498b4a83974da56b252122b9752bf 对应的 RowKey 及 value 中。
scan.setStartRow(Bytes.toBytes(rowkey_pre)); //scan, 7+7c9498b4a83974da56b252122b9752bf byte[] stopRow = Bytes.toBytes(rowkey_pre); stopRow[stopRow.length-1]++; scan.setStopRow(stopRow);// 7+7c9498b4a83974da56b252122b9752bg
Step3:查询结果组成返回对象
遍历 ResultScanner 对象,将每一行对应的数据封装成 table entity,组成 list 返回。
从原始数据中随机抓取 1000 个 check_id,用于模拟测试,连续发起 3 次请求数为 2000(200 个线程并发,循环 10 次),平均响应时间为 51ms,错误率为 0。
如上图,经历 N 次累计测试后,各个 region 上的 Requests 数较为接近,符合负载均衡设计之初。
如果集群开启了安全认证,那么在进行 Spark 提交作业以及访问 HBase 时,均需要进行 kerberos 认证。
本文采用 yarn cluster 模式,像提交普通作业一样,可能会报以下错误。
ERROR StartApp: job failure, java.lang.NullPointerException at com.tinawang.spark.hbase.utils.HbaseKerberos.<init>(HbaseKerberos.java:18) at com.tinawang.spark.hbase.job.SparkWriteHbaseJob.run(SparkWriteHbaseJob.java:60)
定位到 HbaseKerberos.java:18,代码如下:
this.keytabPath = (Thread.currentThread().getContextClassLoader().getResource(prop.getProperty("hbase.keytab"))).getPath();
这是因为 executor 在进行 HBase 连接时,需要重新认证,通过 --keytab 上传的 tina.keytab 并未被 HBase 认证程序块获取到,所以认证的 keytab 文件需要另外通过 --files 上传。示意如下
--keytab /path/tina.keytab \ --principal tina@GNUHPC.ORG \ --files "/path/tina.keytab.hbase"
其中 tina.keytab.hbase 是将 tina.keytab 复制并重命名而得。因为 Spark 不允许同一个文件重复上传。
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2101) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) ... org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext Serialization stack: - object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@24a16d8c) - field (class: com.tinawang.spark.hbase.processor.SparkReadFileRDD, name: sc, type: class org.apache.spark.api.java.JavaSparkContext) ...
解决方法一:
如果 sc 作为类的成员变量,在方法中被引用,则加 transient 关键字,使其不被序列化。
private transient JavaSparkContext sc;
解决方法二:
将 sc 作为方法参数传递,同时使涉及 RDD 操作的类 implements Serializable。 代码中采用第二种方法。详见代码。
Exception in thread "http-nio-8091-Acceptor-0" java.lang.NoClassDefFoundError: org/apache/tomcat/util/ExceptionUtils
或者
Exception in thread "http-nio-8091-exec-34" java.lang.NoClassDefFoundError: ch/qos/logback/classic/spi/ThrowableProxy
查看下面 issue 以及一次排查问题的过程,可能是 open file 超过限制。
https://github.com/spring-projects/spring-boot/issues/1106
http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg
使用 ulimit-a 查看每个用户默认打开的文件数为 1024。
在系统文件 /etc/security/limits.conf 中修改这个数量限制,在文件中加入以下内容, 即可解决问题。
作者介绍
汪婷,中国民生银行大数据开发工程师,专注于 Spark 大规模数据处理和 Hbase 系统设计。
参考文献
http://hbase.apache.org/book.html#perf.writing
http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/
http:/hbasefly.com/2016/03/23/hbase_writer/
https://github.com/spring-projects/spring-boot/issues/1106
http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg