下载并解压 spark-2.2.0-bin-hadoop2.7.tgz, 并且设置好 $SPARK_HOME 。
打包好 carbon jar, 并将 assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 文件复制到 $SPARK_HOME/jars 目录下。
mvn clean package -DskipTests -Pspark-2.2
在终端启动套接字数据服务器
nc -lk 9099
在里面输入以下格式的 CSV 行数据
1,col1
2,col2
3,col3
4,col4
5,col5
在新的终端启动 spark-shell ,在里面输入 :paste
, 将下面的代码复制到里面并运行:
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonTablePath
val warehouse = new File("./warehouse").getCanonicalPath
val metastore = new File("./metastore").getCanonicalPath
val spark = SparkSession
.builder()
.master("local")
.appName("StreamExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(warehouse, metastore)
spark.sparkContext.setLogLevel("ERROR")
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS carbon_table")
// Create target carbon table and populate with initial data
spark.sql(
s"""
| CREATE TABLE carbon_table (
| col1 INT,
| col2 STRING
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('streaming'='true')""".stripMargin)
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
val tablePath = carbonTable.getTablePath
// batch load
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9099)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "carbon_table")
.start()
// start new thread to show data
new Thread() {
override def run(): Unit = {
do {
spark.sql("select * from carbon_table").show(false)
Thread.sleep(10000)
} while (true)
}
}.start()
qry.awaitTermination()
继续在数据服务器中输入一些行数据,spark-shell 将会显示表的新数据。
流式表(Streaming table)只是一个普通的具有流式属性的 carbon 表,用户可以使用下面的 DDL 创建流式表。
CREATE TABLE streaming_table (
col1 INT,
col2 STRING
)
STORED BY 'carbondata'
TBLPROPERTIES('streaming'='true')
"DESC FORMATTED" 命令将会显示流式属性。
DESC FORMATTED streaming_table
对于旧表,使用 ALTER TABLE 命令来设置流式属性。
ALTER TABLE streaming_table SET TBLPROPERTIES('streaming'='true')
在流式数据摄取开始时,系统将尝试获取名为 streaming.lock 的表级锁文件。如果系统无法获取这张表的锁,将会抛出 InterruptedException。
流式输入的数据将会被摄入到 CarbonData 表的 segment 中,这个 segment 的状态是 streaming。在 CarbonData 中称其为流式段(streaming segment)。tablestatus 文件将会记录段的状态和数据大小。用户可以使用 SHOW SEGMENTS FOR TABLE tableName 来检查段状态。
当流式段达到了最大大小,CarbonData 将段的状态从 streaming 修改成 streaming finish,并且创建一个新的流式段以便继续摄取流式数据。
使用下面命令将段的状态从 "streaming" 转变成 "streaming finish"。如果流式应用程序正在运行,则此命令将被阻塞。
ALTER TABLE streaming_table FINISH STREAMING
使用下面命令手动将 "streaming finish" 段传递给列式段
ALTER TABLE streaming_table COMPACT 'streaming'
配置 carbon.streaming.auto.handoff.enabled 属性来自动传递流式段。如果这个属性的值为 true,当流式段达到最大大小,CarbonData 会将这个段的状态置为 streaming finish,并且在一个新线程自动触发将此段递交到一个列式格式的段。
通过 carbon.stream.parser 参数配置流数据解析器,以便在写流数据是将 InternalRow 转换成 Object[]。
当前, CarbonData 支持以下两种解析器:
1. org.apache.carbondata.streaming.parser.CSVStreamParserImp: 这是默认的流数据解析器,它从 InternalRow 的第一个索引获取行数据(字符串类型)并将此字符串转换为 Object []。
2. org.apache.carbondata.streaming.parser.RowStreamParserImp: 这个流解析器自动根据 DataSet
的模式将 InternalRow 转换成 Object[], 比如:
case class FileElement(school: Array[String], age: Int)
case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
...
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9099)
.load()
.as[String]
.map(_.split(","))
.map { fields => {
val tmp = fields(4).split("\\$")
val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
} }
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", "default")
.option("tableName", "carbon_table")
.option(CarbonStreamParser.CARBON_STREAM_PARSER,
CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
...
如果用户需要自定义流解析器来将特定的 InternalRow 转换成 Object[],他需要实现 CarbonStreamParser
接口的 initialize
和 parserRow
方法,比如:
package org.XXX.XXX.streaming.parser
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
class XXXStreamParserImp extends CarbonStreamParser {
override def initialize(configuration: Configuration, structType: StructType): Unit = {
// user can get the properties from "configuration"
}
override def parserRow(value: InternalRow): Array[Object] = {
// convert InternalRow to Object[](Array[Object] in Scala)
}
override def close(): Unit = {
}
}
然后将 carbon.stream.parser 属性设置成 org.XXX.XXX.streaming.parser.XXXStreamParserImp。
使用下面命令将所有流式段递交给列式格式的段,并且将流属性设置为 false,这张表将会变成一张普通的表。
ALTER TABLE streaming_table COMPACT 'close_streaming'