当前位置:网站首页>[spark] how to implement spark SQL field blood relationship
[spark] how to implement spark SQL field blood relationship
2022-06-26 06:12:00 【Impl_ Sunny】
0、 background
Field consanguinity is to retain the processing process of fields in the process of table processing . Why do you need a blood relationship ?
With the blood relationship between fields , You can know the source and destination of the data , And the conversion relationship between fields , In this way, the quality of data , Governance helps a lot .
Spark SQL be relative to Hive Generally speaking, it will be more efficient , For runtime 、 The use of resources, such as the above, will have greater benefits .
The platform plans to Hive The task moved to Spark SQL On , At the same time, it also needs to realize the function of field blood relationship .Hive Direct blood relationship Atlas Support ,Spark How to realize the field blood relationship of ?
One 、SparkSQL Expand
Spark It supports extension : Allow users to Spark SQL Of SQL analysis 、 Analysis and inspection of logical plan 、 Optimization of logical plan 、 The formation of physical plan, etc , And right Spark The source code has not been changed , The cost is also relatively small .
1.1 Spark Scalable content
SparkSessionExtensions Is an important class , It defines the method of injecting rules , The following are now supported :
【Analyzer Rules】 Logical plan analysis rules
【Check Analysis Rules】 Logical plan check rules
【Optimizer Rules.】 Logical plan optimization rules
【Planning Strategies】 Strategies for forming physical plans
【Customized Parser】 Self defined sql Parser
【(External) Catalog listeners catalog】 Monitor
In the above six places that can be customized by users , We chose 【Check Analysis Rules】. Because the check rule does not need to have a return value when the method is called , This means that there is no need to modify the currently traversed logical plan tree , This is exactly what we need .
and 【Analyzer Rules】、【Optimizer Rules】 You need to modify the current logical plan , Makes it difficult for us to iterate over the entire tree , It's hard to get the results we want .
1.2 Implement your own extensions
class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
override def apply(spark: SparkSessionExtensions): Unit = {
// Field consanguinity
spark.injectCheckRule(FieldLineageCheckRuleV3)
//sql Parser
spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }
}
}The above implements the extension in this way , And in apply Method to inject the rules you need into SparkSessionExtensions that will do , In addition to the above four types of injection, there are other rules .
Must let ExtralSparkExtension If it works, we need to be in spark-default.conf The configuration spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension, Start up Spark The task will take effect when .
Notice that we also implemented a custom SQL Parser , In fact, the parser doesn't do much . Just judging if the statement contains insert When it's time to SQLText(SQL sentence ) Set to one that is FIELD_LINE_AGE_SQL, The reason will be SQLText Put it in FIELD_LINE_AGE_SQL Inside . Because in DheckRule You can't get it inside SparkPlan Yes, we need to be right SQL Analyze again and get SprkPlan, and FieldLineageCheckRuleV3 The implementation of is also particularly simple , The important thing is in another thread implementation .
Here we only focus on insert sentence , Because the insert statement has input from some tables and then write to a table .
class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{
override def parsePlan(sqlText: String): LogicalPlan = {
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert")){
if(lineAgeEnabled){
if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
// Thread local variables are here
FIELD_LINE_AGE_SQL.set(sqlText)
}
FIELD_LINE_AGE_SQL_COULD_SET.remove()
}
}
delegate.parsePlan(sqlText)
}
// Call original sqlparser
override def parseExpression(sqlText: String): Expression = {
delegate.parseExpression(sqlText)
}
// Call original sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier = {
delegate.parseTableIdentifier(sqlText)
}
// Call original sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
delegate.parseFunctionIdentifier(sqlText)
}
// Call original sqlparser
override def parseTableSchema(sqlText: String): StructType = {
delegate.parseTableSchema(sqlText)
}
// Call original sqlparser
override def parseDataType(sqlText: String): DataType = {
delegate.parseDataType(sqlText)
}
}1.3 Extended rule class
case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit = {
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null){
// Here we get sql Then start a thread to do the remaining parsing tasks
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)
}
}
}It's simple , We just got SQL Then a thread is started to get SparkPlan, The actual logic is
FieldLineageRunnableV3.
1.4 Specific implementation methods
1.4.1 obtain SparkPlan
We are run Method to get SparkPlan:
override def run(): Unit = {
val parser = sparkSession.sessionState.sqlParser
val analyzer = sparkSession.sessionState.analyzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
............
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val analyzedPlan = analyzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(analyzedPlan)
// obtain sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
...............
if(targetTable != null){
val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
...............Why use SparkPlan Well ? When we thought about it , When the physical plan takes the field relationship, it is more accurate , And the link is shorter and more direct .
Let me add that here Spark SQL The process of parsing is as follows :

after SqlParser You'll get a logical plan , At this point, the table name 、 Functions and so on are not parsed , Can't execute yet ; after Analyzer Will analyze some binding information , For example, table validation 、 Field information 、 The function of information ; after Optimizer The post logical plan will be optimized according to the established rules , The rule here is RBO, Of course Spark And support CBO The optimization of the ; after SparkPlanner Then it becomes an executable physical plan .
Let's look at an example of the comparison between a logical plan and a physical plan :
One SQL sentence :
select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3Logical plan :

Physical plan :

Obviously, it simplifies a lot .
obtain SparkPlan after , We can according to different SparkPlan Nodes do iterative processing .
We divide the field blood into two types :projection(select Query field )、predication(wehre Query criteria ).
These two are a point-to-point relationship , That is, the corresponding relationship between the fields of the target table is generated from the fields of the original table .
Imagine a query as a tree , Then the iterative relationship will be as follows, starting from the top of the tree , Until the leaf node of the tree , The leaf node is the original table :

Then the result of our iterative query should be
id ->tab1.id ,
name->tab1.name,tabb2.name,
age→tabb2.age.
Notice the variable
val levelProject = new ArrayBuffer
[ArrayBuffer[NameExpressionHolder]](), adopt projecti-onLineAge After iteration levelProject The top layer is stored id,name,age Corresponding (tab1.id),(tab1.name,tabb2.name),(tabb2.age).
Of course, it is not a simple recursive iteration , Special situations also need to be considered, such as :Join、ExplandExec、Aggregate、Explode、GenerateExec And so on need special consideration .
Examples and effects :
SQL:
with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
select C.id,concat(C.name,B.name) as name, B.age from
B,C where C.id = B.ideffect :
{
"edges": [
{
"sources": [
3
],
"targets": [
0
],
"expression": "id",
"edgeType": "PROJECTION"
},
{
"sources": [
4,
7
],
"targets": [
1
],
"expression": "name",
"edgeType": "PROJECTION"
},
{
"sources": [
5
],
"targets": [
2
],
"expression": "age",
"edgeType": "PROJECTION"
},
{
"sources": [
6,
3
],
"targets": [
0,
1,
2
],
"expression": "INNER",
"edgeType": "PREDICATE"
},
{
"sources": [
6,
5
],
"targets": [
0,
1,
2
],
"expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
"edgeType": "PREDICATE"
},
{
"sources": [
3
],
"targets": [
0,
1,
2
],
"expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
"edgeType": "PREDICATE"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "default.tab3.id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "default.tab3.name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "default.tab3.age"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "default.tab1.id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "default.tab1.name"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.age"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.name"
}
]
}Two 、 summary
stay Spark SQL In the implementation of field kinship , Through its self expansion , First of all, I got insert sentence , Get... In our own inspection rules SQL sentence , adopt SparkSqlParser、Analyzer、Optimizer、SparkPlanner, Finally got the physical plan .
We plan through iterative physics , Make corresponding conversion according to different execution plans , Then we get the corresponding relationship between the fields . The current implementation is relatively simple , The correspondence between fields is a straight line , The intermediate process is ignored , If you want to realize the whole process of field conversion, there is no problem .
边栏推荐
- 冒泡排序(Bubble Sort)
- numpy. tile()
- Overloading and overriding
- numpy.tile()
- How to associate wechat applet QR code to realize two code aggregation
- Redis multithreading and ACL
- 302. 包含全部黑色像素的最小矩形 BFS
- numpy.random.choice
- Day2- syntax basis and variables
- The interviewer with ByteDance threw me an interview question and said that if I could answer it, other companies would have an 80% chance of passing the technical level
猜你喜欢
随机推荐
Func < T, tresult > Commission - learning record
Day3 - variables and operators
Print bit information of numbers
Definition of Halcon hand eye calibration
实时数仓方案如何选型和构建
EFK升级到ClickHouse的日志存储实战
Interface oriented programming
MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications
跨域的五种解决方案
Logstash -- send an alert message to the nail using the throttle filter
numpy. log
MEF framework learning record
String class learning
423- binary tree (110. balanced binary tree, 257. all paths of binary tree, 100. same tree, 404. sum of left leaves)
Household accounting procedures (the second edition includes a cycle)
MySQL-05
Logstash——Logstash将数据推送至Redis
MySQL-08
EFK昇級到ClickHouse的日志存儲實戰
COW读写复制机制在Linux,Redis ,文件系统中的应用








