当前位置:网站首页>Spark SQL Start(2.4.3)
Spark SQL Start(2.4.3)
2022-06-22 23:03:00 【M_ O_】
Original address : https://spark.apache.org/docs/latest/sql-programming-guide.html
OverView
Spark SQL Is used to process structured data spark modular . And basic Spark RDD API Different ,Spark SQL The interface provided is Spark Provides more data structure and calculation information . In the internal ,Spark SQL Use this extra information to perform additional optimizations . There are several ways to communicate with Spark SQL Interaction , Include SQL and DataSet API. When calculating the result , Will use the same execution engine , And what you use to implement the computation API/ Language has nothing to do . This unification means that developers can easily work in different API Switch back and forth between , Based on these Spark Provides a very natural conversion expression .
All the sample code on this page uses spark Sample data in distributed , Can run in spark-shell, pyspark-shell perhaps sparkR shell in .
SQL
Spark SQL One use of SQL Inquire about .Spark SQL It can also be used from existing Hive Read data from the service . More about how to configure this feature , see also Hive Tables part . Run from another programming language SQL when , The results will be treated as data sets / Data frame (Dataset/DataFrame) return . You can also use the command line or through jdbc/odbc And SQL Interface interaction .
Datasets and DataFrames
A dataset is a distributed collection of data .DataSet yes Spark 1.6 A new interface added to , It provides RDD The advantages of ( Strong type 、 Use powerful lambda Capability of a function ) as well as Spark SQL Optimized execution engine optimization . Data sets can be obtained from jvm Object construction , Then use the function transformation (map,flatmap,filter, wait ) To operate . Data sets API stay Scala and Java Is available , however Python Datasets are not supported API. however , because python The dynamic characteristics of , Data sets API Many of the benefits of are already available ( You can naturally access the fields of a row by name ’row.columnname`).R The situation is similar .
A data frame is a data set consisting of named columns . Conceptually , It is equivalent to a table or... In a relational database R/Python A data frame in , But there are more optimizations at the bottom . Data frames can be constructed from a series of sources , for example : Structured data file 、Hive In the table 、 External database or existing RDD. Data frame API stay Scala、Java、Python and R Effective in both . stay Scala and Java in , The data frame can be expressed as Row Data set of . stay Scala API in ,DataFrame Expressed as Dataset[Row], And in the Java API in , use Dataset<Row> To express DataFrame.
Get Start
Starting Point: SparkSession
Spark Sql The entrance to is SparkSession, establish SparkSession Just call SparkSession.builder():
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Spark 2.0 Medium SparkSession by Hive Provides built-in support , Including the use of HiveQL Write a query 、 visit Hive UDF, And from Hive Table read data function . To use these features , You don't need to install Hive.
Creating DataFrames( Create data frames )
Application and use SparkSession, You can choose from existing RDD,Hive surface , Spark Create data frames in the data source .
for instance , The following code is from Json Create a data frame in the file :
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Untyped Dataset Operations (aka DataFrame Operations)
Untyped dataset operations ( Also called data frame operation )
The data frame is Scala,Java,Python and R Manipulating structured data provides a domain specific language .
As mentioned above , stay Spark2.0 in , Scala and Java API The data frame of is just Row Data sets of type . These operations are also called " Untyped conversion ", And strongly typed Scala/Java Data sets " Typed conversion " contrary .
Here are some basic examples of using datasets to process structured data :
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
A complete list of the types of operations that can be performed on the dataset , see also API file
In addition to simple column references and expressions , The dataset also has a rich library of functions , Including string operations 、 Date processing 、 Common mathematical operations, etc . The complete list is in DataFrame Function Reference
Running SQL Queries Programmatically
SparkSession Upper sql Functions allow applications to run programmatically SQL Inquire about , And take the result as Dataset<Row> return .
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Global Temporary View
Spark SQL The temporary view in is spark session Scope , And if the session that created it terminates , It will disappear . If you want to have a temporary view shared between all sessions , And in Spark Remain active until the application terminates , Then you can create a global temporary view . The global temporary view is bound to the system reserved database global_temp, We have to use qualified names to refer to it , for example select * from global_temp.view1
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Creating Datasets( Create a dataset )
The data set and RDD be similar , however , They don't use Java Serialize or Kryo, Instead, special encoders are used to serialize objects for processing or transmission over the network . Although both encoder and standard serialization are responsible for converting objects into bytes , But the encoder is dynamically generated code , And use a Spark A format that performs many operations , Such as screening 、 Sorting and hashing , Instead of deserializing bytes back to the object .
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Interoperating with RDDs( And RDD Interaction )
Spark SQL Two methods are supported to integrate existing RDD Convert to dataset . The first method uses reflection to infer that RDD The pattern of . This reflection based approach leads to more concise code , And writing Spark Application time , When you already know the pattern , It works well .
The second way to create a dataset is through a programming interface , This interface allows you to construct a pattern , Then apply it to the existing RDD. This method is more detailed , It allows you to construct data sets only when you know the columns and their types at run time .
Inferring the Schema Using Reflection( Using reflection inference mode )
Spark SQL Support automatic transfer of JavaBeans Of RDD Convert to data frame . Using reflection to get BeanInfo Defines the schema of the table . at present ,Spark SQL Inclusion is not supported Map Field JavaBeans. however , Supports nested javaBeans and List or Array Field . You can create... By creating a class JavaBean, This class implements Serializable Interface and all fields have getter and setter.
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Programmatically Specifying the Schema( Program the specified mode )
When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a Dataset<Row> can be created programmatically with three steps.
If you can't define it in advance JavaBean class ( for example , The structure of the record is encoded in a string , Or you need to parse the text data set and different users have different fields ), It can be created programmatically in three steps Dataset<Row>;
- 1. From primitive RDD Create based on
RowOf RDD - 2. Use
StructTypeCreate a match RDD inRowData model - 3. Use
createDataFrameMethod to apply a pattern to aRowOf RDD
example :
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
Aggregations( polymerization )
The built-in data frame function provides common aggregation , Such as count()、countDistinct()、avg()、max()、min() etc. . Although these functions are designed for data frames , but Spark SQL stay Scala and Java Some functions in also have type safe versions , Can be used with strongly typed datasets . Besides , Users are not limited to predefined aggregate functions , You can also create your own aggregate functions .
Untyped User-Defined Aggregate Functions( Untyped user-defined aggregate functions )
The user must extend UserDefinedAggregateFunction Abstract classes to implement custom untyped aggregate functions . for example , User defined averages can be as follows :
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
// Calculates the final result
public Double evaluate(Row buffer) {
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
}
// Register the function to access it
spark.udf().register("myAverage", new MyAverage());
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
Type-Safe User-Defined Aggregate Functions( Type user-defined aggregate functions )
User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:
User defined aggregations of strongly typed datasets revolve around aggregator abstract classes Aggregator Realization . for example , The type safe user-defined average can be as follows :
public static class Employee implements Serializable {
private String name;
private long salary;
// Constructors, getters, setters...
}
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
边栏推荐
- AtCoder abc256全题解(区间合并模板、矩阵快速幂优化dp、线段树……)
- Several soapdatamodules on Delphi soap WebService server side
- Is it bad for NFT that the market starts to cool down?
- Total number of combinations [standard backtracking + backtracking techniques -- reducing stack depth]
- 2021-08-21
- Mysql database DQL query operation
- Delphi SOAP WebService 服务器端多个 SoapDataModule 要注意的问题
- Relationship between adau1452 development system interface and code data
- The relationship between derivative and differential of function
- MySQL multi table operation exercise
猜你喜欢

Las point cloud data thinning in ArcGIS

The required reading for candidates | PMP the test on June 25 is approaching. What should we pay attention to?
![Next permutation [give play to subjective initiative to discover laws]](/img/bb/262e1a21e4babb8d221d737ced3bcc.png)
Next permutation [give play to subjective initiative to discover laws]

What are the indicators, dimensions and models in Business Intelligence BI data warehouse?

Las point cloud create mesh
![A group of K overturned linked lists [disassembly / overturning / assembly of linked lists]](/img/70/fb783172fa65763f031e6bd945cbd9.png)
A group of K overturned linked lists [disassembly / overturning / assembly of linked lists]

Reasons for the failure of digital transformation and the way to success

LinkedList 源码解析

5 minutes to quickly launch web applications and APIs (vercel)

2021-08-22
随机推荐
Lua-- use of data types, variables, loops, functions and operators
A case of 94 SQL optimization (the writing method used is often rejected)
2021-01-29
Redis big key problem
Solution to cache inconsistency
Task cache compilation caused by gradle build cache
2020-12-04
Relationship between adau1452 development system interface and code data
Cryptography series: certificate format representation of PKI X.509
《强化学习周刊》第50期:SafeRL-Kit、GMI-DRL、RP-SDRL & 离线元强化学习
2021-08-21
[path planning] week 1: Path Planning open source code summary (ROS) version
MySQL functions
Summary of just meal with 900W increase in playback and acclaim from station B users
In the third week of June, the main growth ranking list (BiliBili platform) of station B single feigua data up was released!
Wechat applet batch submission for review
Zynq ultrascale + rfsoc zcu111 RF clock tree learning 1
How to change the dial on the apple Watch
组合总数[标准回溯 + 回溯技巧--降低栈深度]
How to quickly build an enterprise knowledge base at low cost?