当前位置:网站首页>Spark SQL Start(2.4.3)
Spark SQL Start(2.4.3)
2022-06-22 20:46:00 【M_O_】
原文地址: https://spark.apache.org/docs/latest/sql-programming-guide.html
OverView
Spark SQL是用于处理结构化数据的spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多的数据结构和计算信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与Spark SQL交互,包括SQL和DataSet API。当计算结果时,会使用相同的执行引擎,而与你用来实现计算的API/语言无关。这种统一意味着开发人员可以很容易地在不同的API之间来回切换,基于这些Spark提供了非常自然的转换表达式。
本页中的所有的示例代码使用spark分布式中的示例数据, 可以运行在spark-shell, pyspark-shell 或者sparkR shell中.
SQL
Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有的Hive服务中读取数据。有关如何配置此功能的详细信息,请参阅Hive Tables 部分。从其他编程语言运行SQL时,结果将作为数据集/数据帧(Dataset/DataFrame)返回。你还可以使用命令行或通过jdbc/odbc与SQL接口交互。
Datasets and DataFrames
数据集是数据的分布式集合。DataSet是Spark 1.6中添加的一个新接口,它提供了RDD的优点(强类型、使用强大lambda函数的能力)以及Spark SQL优化的执行引擎的优化。数据集可以从jvm对象构造,然后使用函数转换(map,flatmap,filter,等等)进行操作。数据集的API在Scala和Java中是可用的, 但是Python不支持数据集API。但是,由于python的动态特性,数据集API的许多好处已经可用(你可以自然地按名称访问行的字段’row.columnname`)。R的情况类似。
数据帧是一个由命名列组成的数据集。在概念上,它相当于关系数据库中的一个表或R/Python中的一个数据帧,但在底层做了更多优化。数据帧可以从一系列源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有RDD。数据帧 API在Scala、Java、Python和R中都有效. 在Scala和Java中,数据帧可以表示为Row的数据集。在Scala API中,DataFrame表示为Dataset[Row], 而在Java API中,用 Dataset<Row>来表示DataFrame。
Get Start
Starting Point: SparkSession
Spark Sql的入口是SparkSession, 创建SparkSession只需要调用SparkSession.builder():
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Spark 2.0中的SparkSession为Hive提供了内置支持,包括使用HiveQL编写查询、访问 Hive UDF, 以及从Hive表读取数据的功能。要使用这些功能,你不需要安装Hive。
Creating DataFrames(创建数据帧)
应用使用SparkSession, 可以从现有RDD,Hive表, Spark数据源中创建数据帧.
举个例子, 下面的代码从Json文件中创建数据帧:
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Untyped Dataset Operations (aka DataFrame Operations)
非类型化的数据集操作(又称数据帧操作)
数据帧为Scala,Java,Python和R操作结构化数据提供了特定领域语言.
就像上面提到的, 在Spark2.0中, Scala和Java API的数据帧只是Row类型的数据集. 这些操作也叫"非类型化转换", 与强类型的Scala/Java数据集的"类型化转换"相反.
下面是使用数据集处理结构化数据的一些基本例子:
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
有关可以对数据集执行的操作类型的完整列表,请参阅API文档
除了简单的列引用和表达式外,数据集还具有丰富的函数库,包括字符串操作、日期处理、常见数学运算等。完整列表在DataFrame Function Reference
Running SQL Queries Programmatically
SparkSession上的sql函数允许应用程序以编程方式运行SQL查询,并将结果作为Dataset<Row>返回。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Global Temporary View
Spark SQL中的临时视图是spark session范围的,并且如果创建它的会话终止,它将消失。如果你希望拥有一个在所有会话之间共享的临时视图,并在Spark应用程序终止之前保持活动状态,那么你可以创建一个全局临时视图。全局临时视图绑定到系统保留的数据库global_temp,我们必须使用限定名称来引用它,例如select * from global_temp.view1
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Creating Datasets(创建数据集)
数据集与RDD相似,但是,它们不使用Java序列化或Kryo,而是使用专门的编码器来序列化对象以在网络上进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并使用一种允许Spark执行许多操作的格式,如筛选、排序和散列,而不将字节反序列化回对象。
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Interoperating with RDDs(与RDD的交互)
Spark SQL支持两种方法将现有RDD转换为数据集。第一种方法使用反射来推断包含特定对象类型的RDD的模式。这种基于反射的方法可以得到更简洁的代码,并且在编写Spark应用程序时,当你已经知道模式时,它可以很好地工作。
创建数据集的第二种方法是通过编程接口,该接口允许你构造一个模式,然后将其应用到现有的RDD。此方法更详细,它允许你在运行时才知道列及其类型时再构造数据集。
Inferring the Schema Using Reflection(使用反射推断模式)
Spark SQL支持自动将JavaBeans的RDD转换为数据帧。使用反射获得的BeanInfo定义了表的模式。目前,Spark SQL不支持包含Map字段的JavaBeans。不过,支持嵌套的javaBeans和List或Array字段。您可以通过创建一个类来创建JavaBean,该类实现了Serializable接口并且所有字段都有getter和setter。
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Programmatically Specifying the Schema(编程指定模式)
When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a Dataset<Row> can be created programmatically with three steps.
如果不能提前定义JavaBean类(例如,记录的结构编码在字符串中,或者需要解析文本数据集并且不同的用户有不同的字段),则可以通过三个步骤以编程方式创建Dataset<Row>;
- 1.从原始RDD创建基于
Row的RDD - 2.使用
StructType创建匹配RDD中Row数据的模式 - 3.使用
createDataFrame方法将模式应用到基于Row的RDD
例:
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
Aggregations(聚合)
内置的数据帧函数提供常见的聚合,如count()、countDistinct()、avg()、max()、min()等。虽然这些函数是为数据帧设计的,但Spark SQL在Scala和Java中的某些函数还具有类型安全的版本,可用于强类型数据集。此外,用户不仅限于预定义的聚合函数,还可以创建自己的聚合函数。
Untyped User-Defined Aggregate Functions(非类型化的用户自定义聚合函数)
用户必须扩展UserDefinedAggregateFunction抽象类以实现自定义的非类型化聚合函数。例如,用户定义的平均值可以如下所示:
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
// Calculates the final result
public Double evaluate(Row buffer) {
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
}
// Register the function to access it
spark.udf().register("myAverage", new MyAverage());
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
Type-Safe User-Defined Aggregate Functions(类型化用户自定义聚合函数)
User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:
强类型数据集的用户定义聚合围绕聚合器抽象类Aggregator实现。例如,类型安全的用户定义平均值可以如下所示:
public static class Employee implements Serializable {
private String name;
private long salary;
// Constructors, getters, setters...
}
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
边栏推荐
- 2021-05-02
- Codeup longest palindrome substring
- ArcGIS应用(二十)Arcgis 栅格图像符号系统提示“This dataset does not have valid histogram required for classificati…”
- Several methods of changing 91 Oracle common table into partitioned table
- Which is the higher priority of V-IF or V-for?
- 2020-12-04
- A SQL optimization case using order by and rownum
- Greedy distribution problem (2)
- Why is yuancosmos so popular? Is the 10trillion yuan shouted by the market boasting or the truth?
- sitl_ gazebo/include/gazebo_ opticalflow_ plugin. h:43:18: error: ‘TRUE’ was not declared in this scope
猜你喜欢

Why is yuancosmos so popular? Is the 10trillion yuan shouted by the market boasting or the truth?

2021-04-14

2021-04-14

Introduction and example application of PostgreSQL string separator function (regexp\u split\u to\u table)

General trend wisdom to create inclined model and cut monomer

pycharm 配置远程连接服务器开发环境

安装typescript环境并开启VSCode自动监视编译ts文件为js文件

The mental process and understanding of visual project code design

NFT can only be viewed from afar, but not blatantly played

Remote access and control - SSH Remote Management and TCP wrappers access control
随机推荐
What are the indicators, dimensions and models in Business Intelligence BI data warehouse?
2021-04-14
Mysql database DQL exercise
2020-12-04
Lua -- iterator, module, meta table
[path planning] week 1: Path Planning open source code summary (ROS) version
In a frame because it set 'X-FRAME-OPTIONS' to' deny '
Seriously, the hang up of the kotlin collaboration process is not so mysterious (principle)
2021-04-05
Mysql8 installation and environment configuration
Case 2 of SQL performance degradation caused by modifying implicit parameters
The method of making videos of knowledge payment system support m3u8 format playback
Wechat applet batch submission for review
ArcGIS应用(二十)Arcgis 栅格图像符号系统提示“This dataset does not have valid histogram required for classificati…”
MySQL multi table operation
5 minutes to quickly launch web applications and APIs (vercel)
Pycharm configuring remote connection server development environment
How to change the dial on the apple Watch
Which securities company is the safest and best choice for stock trading account opening
How to improve work efficiency? Macintosh efficiency tool set