当前位置:网站首页>[spark] how to implement spark SQL field blood relationship

[spark] how to implement spark SQL field blood relationship

2022-06-26 06:12:00 Impl_ Sunny

0、 background

Field consanguinity is to retain the processing process of fields in the process of table processing . Why do you need a blood relationship ?

With the blood relationship between fields , You can know the source and destination of the data , And the conversion relationship between fields , In this way, the quality of data , Governance helps a lot .

Spark SQL be relative to Hive Generally speaking, it will be more efficient , For runtime 、 The use of resources, such as the above, will have greater benefits .

The platform plans to Hive The task moved to Spark SQL On , At the same time, it also needs to realize the function of field blood relationship .Hive Direct blood relationship Atlas Support ,Spark How to realize the field blood relationship of ?

One 、SparkSQL Expand

Spark It supports extension : Allow users to Spark SQL Of SQL analysis 、 Analysis and inspection of logical plan 、 Optimization of logical plan 、 The formation of physical plan, etc , And right Spark The source code has not been changed , The cost is also relatively small .

1.1 Spark Scalable content

SparkSessionExtensions Is an important class , It defines the method of injecting rules , The following are now supported :

  • 【Analyzer Rules】 Logical plan analysis rules

  • 【Check Analysis Rules】 Logical plan check rules

  • 【Optimizer Rules.】 Logical plan optimization rules

  • 【Planning Strategies】 Strategies for forming physical plans

  • 【Customized Parser】 Self defined sql Parser

  • 【(External) Catalog listeners  catalog】 Monitor

In the above six places that can be customized by users , We chose 【Check Analysis Rules】. Because the check rule does not need to have a return value when the method is called , This means that there is no need to modify the currently traversed logical plan tree , This is exactly what we need .

and 【Analyzer Rules】、【Optimizer Rules】 You need to modify the current logical plan , Makes it difficult for us to iterate over the entire tree , It's hard to get the results we want .

1.2 Implement your own extensions

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
  override def apply(spark: SparkSessionExtensions): Unit = {

    // Field consanguinity 
    spark.injectCheckRule(FieldLineageCheckRuleV3)

    //sql Parser 
    spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }

  }
}

The above implements the extension in this way , And in apply Method to inject the rules you need into SparkSessionExtensions that will do , In addition to the above four types of injection, there are other rules .

Must let ExtralSparkExtension If it works, we need to be in spark-default.conf The configuration spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension, Start up Spark The task will take effect when .

Notice that we also implemented a custom SQL Parser , In fact, the parser doesn't do much . Just judging if the statement contains insert When it's time to SQLText(SQL sentence ) Set to one that is FIELD_LINE_AGE_SQL, The reason will be SQLText Put it in FIELD_LINE_AGE_SQL Inside . Because in DheckRule You can't get it inside SparkPlan Yes, we need to be right SQL Analyze again and get SprkPlan, and FieldLineageCheckRuleV3 The implementation of is also particularly simple , The important thing is in another thread implementation .

Here we only focus on insert sentence , Because the insert statement has input from some tables and then write to a table .

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{

  override def parsePlan(sqlText: String): LogicalPlan = {
    val lineAgeEnabled = SparkSession.getActiveSession
      .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
    logDebug(s"SqlText: $sqlText")
    if(sqlText.toLowerCase().contains("insert")){
      if(lineAgeEnabled){
        if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
          // Thread local variables are here 
          FIELD_LINE_AGE_SQL.set(sqlText)
        }
        FIELD_LINE_AGE_SQL_COULD_SET.remove()
      }
    }
    delegate.parsePlan(sqlText)
  }
  // Call original sqlparser
  override def parseExpression(sqlText: String): Expression = {
    delegate.parseExpression(sqlText)
  }
  // Call original sqlparser
  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
    delegate.parseTableIdentifier(sqlText)
  }
  // Call original sqlparser
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
    delegate.parseFunctionIdentifier(sqlText)
  }
  // Call original sqlparser
  override def parseTableSchema(sqlText: String): StructType = {
    delegate.parseTableSchema(sqlText)
  }
  // Call original sqlparser
  override def parseDataType(sqlText: String): DataType = {
    delegate.parseDataType(sqlText)
  }
}

1.3 Extended rule class


case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {

  val executor: ThreadPoolExecutor =
    ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

  override def apply(plan: LogicalPlan): Unit = {
    val sql = FIELD_LINE_AGE_SQL.get
    FIELD_LINE_AGE_SQL.remove()
    if(sql != null){
      // Here we get sql Then start a thread to do the remaining parsing tasks 
      val task = new FieldLineageRunnableV3(sparkSession,sql)
      executor.execute(task)
    }

  }
}

It's simple , We just got SQL Then a thread is started to get SparkPlan, The actual logic is

FieldLineageRunnableV3.

1.4  Specific implementation methods

1.4.1  obtain SparkPlan

We are run Method to get SparkPlan:


override def run(): Unit = {
  val parser = sparkSession.sessionState.sqlParser
  val analyzer = sparkSession.sessionState.analyzer
  val optimizer = sparkSession.sessionState.optimizer
  val planner = sparkSession.sessionState.planner
      ............
  val newPlan = parser.parsePlan(sql)
  PASS_TABLE_AUTH.set(true)
  val analyzedPlan = analyzer.executeAndCheck(newPlan)

  val optimizerPlan = optimizer.execute(analyzedPlan)
  // obtain sparkPlan
  val sparkPlan = planner.plan(optimizerPlan).next()
  ...............
if(targetTable != null){
  val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
  val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
  //projection
  projectionLineAge(levelProject, sparkPlan.child)
  //predication
  predicationLineAge(predicates, sparkPlan.child)
  ...............

Why use SparkPlan Well ? When we thought about it , When the physical plan takes the field relationship, it is more accurate , And the link is shorter and more direct .

Let me add that here Spark SQL The process of parsing is as follows :

after SqlParser You'll get a logical plan , At this point, the table name 、 Functions and so on are not parsed , Can't execute yet ; after Analyzer Will analyze some binding information , For example, table validation 、 Field information 、 The function of information ; after Optimizer The post logical plan will be optimized according to the established rules , The rule here is RBO, Of course Spark And support CBO The optimization of the ; after SparkPlanner Then it becomes an executable physical plan .

Let's look at an example of the comparison between a logical plan and a physical plan :

One SQL sentence :

select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

Logical plan :

Physical plan :

Obviously, it simplifies a lot .

obtain SparkPlan after , We can according to different SparkPlan Nodes do iterative processing .

We divide the field blood into two types :projection(select Query field )、predication(wehre Query criteria ).

These two are a point-to-point relationship , That is, the corresponding relationship between the fields of the target table is generated from the fields of the original table .

Imagine a query as a tree , Then the iterative relationship will be as follows, starting from the top of the tree , Until the leaf node of the tree , The leaf node is the original table :

Then the result of our iterative query should be

id ->tab1.id , 

name->tab1.name,tabb2.name,

age→tabb2.age.

Notice the variable

 val levelProject = new ArrayBuffer

[ArrayBuffer[NameExpressionHolder]](), adopt projecti-onLineAge After iteration levelProject The top layer is stored id,name,age Corresponding (tab1.id),(tab1.name,tabb2.name),(tabb2.age).

Of course, it is not a simple recursive iteration , Special situations also need to be considered, such as :Join、ExplandExec、Aggregate、Explode、GenerateExec And so on need special consideration .

Examples and effects :

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
   select C.id,concat(C.name,B.name) as name, B.age from
     B,C where C.id = B.id

effect :


{
  "edges": [
    {
      "sources": [
        3
      ],
      "targets": [
        0
      ],
      "expression": "id",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        4,
        7
      ],
      "targets": [
        1
      ],
      "expression": "name",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        5
      ],
      "targets": [
        2
      ],
      "expression": "age",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        6,
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "INNER",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        6,
        5
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
      "edgeType": "PREDICATE"
    }
  ],
  "vertices": [
    {
      "id": 0,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.id"
    },
    {
      "id": 1,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.name"
    },
    {
      "id": 2,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.age"
    },
    {
      "id": 3,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.id"
    },
    {
      "id": 4,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.name"
    },
    {
      "id": 5,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.age"
    },
    {
      "id": 6,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.id"
    },
    {
      "id": 7,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.name"
    }
  ]
}

Two 、 summary

stay Spark SQL In the implementation of field kinship , Through its self expansion , First of all, I got insert sentence , Get... In our own inspection rules SQL sentence , adopt SparkSqlParser、Analyzer、Optimizer、SparkPlanner, Finally got the physical plan .

We plan through iterative physics , Make corresponding conversion according to different execution plans , Then we get the corresponding relationship between the fields . The current implementation is relatively simple , The correspondence between fields is a straight line , The intermediate process is ignored , If you want to realize the whole process of field conversion, there is no problem .

原网站

版权声明
本文为[Impl_ Sunny]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260600327816.html