当前位置:网站首页>Query the information of students whose grades are above 80
Query the information of students whose grades are above 80
2022-07-25 03:01:00 【Rookies also have dreams】
Java:
package cn.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.In;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class JSONDataSource {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("JSONDataSource");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// in the light of json file establish DF
DataFrame studentScoreDF = sqlContext.read().json(
"hdfs://master:9000/spark-study/students.json"
);
// For student achievement information DF Register temporary form The query score is greater than 80 Student name of
studentScoreDF.registerTempTable("studetn_score");
DataFrame goodStudentScoresDF = sqlContext.sql("select name,score from student_score where score>80");
// take DF Convert to RDD perform transformation operation
List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) throws Exception {
return row.getString(0);
}
}).collect();
// And then for JavaRDD establish DF
List<String> studentInfoJSONs = new ArrayList<String>();
studentInfoJSONs.add("{\"name\":\"leo\",\"age\":18}");
studentInfoJSONs.add("{\"name\":\"Marry\",\"age\":17}");
studentInfoJSONs.add("{\"name\":\"Jack\",\"age\":19}");
JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
// For students' basic information DF Register temporary form Then the query score is greater than 80 Basic information of the sub trail
studentInfosDF.registerTempTable("student_infos");
String sql = "select name,age from student_infos where name in(";
for (int i = 0; i < goodStudentNames.size(); i++){
sql += "'" + goodStudentNames.get(i) + "'";
sql += ",";
}
sql += ")";
DataFrame goodStudentInfosDF = sqlContext.sql(sql);
// then Get two DF Convert to JavaPairRDD perform join operation
// take DF Convert to javardd Again map by javapairrdd Proceed again join
JavaPairRDD<String ,Tuple2<Integer, Integer>> goodStudentRDD = goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0),
Integer.valueOf(String.valueOf(row.getLong(1))));
}
}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0),
Integer.valueOf(String.valueOf(row.getLong(1))));
}
}));
// Then package it in RDD Good student information in Convert to a JavaRDD<Row> In the form of
JavaRDD<Row> goodStudentRowsRDD = goodStudentRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
return RowFactory.create(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1,stringTuple2Tuple2._2._2);
}
});
// Create a metadata take JavaRDD<Row> Convert to DF
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("score",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("age",DataTypes.StringType,true));
StructType structType = DataTypes.createStructType(structFields);
DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD,structType);
// Keep all the information of good students in one json In the document
// take DF The data in is saved to the external json In the document
goodStudentInfosDF.write().format("json").save("hdfs://master:9000/spark_study/java/good-students");
}
}
Scala:
package cn.spark.sql
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object JSONDataSource {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("JSONDataSource")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Create student grades DF
val studentInfoDF = sqlContext.read.json("hdfs://master:9000/spark_study/students.json").registerTempTable("student_score")
// The query shows that the score is greater than 80 Scores of students full name
// studentInfoDF.registerTempTable("student_score")
val studentGoodNames = sqlContext.sql("select name,score from student_score where score>80")
val goodStudentsNames = studentGoodNames.rdd.map{ row => row(0)}.collect()
// Create basic student information DF
val studentInfoJSONs = Array("{\"name\":\"Leo\", \"age\":18}",
"{\"name\":\"Marry\", \"age\":17}",
"{\"name\":\"Jack\", \"age\":19}")
val studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs,3)
val studentInfosDF = sqlContext.read.json(studentInfoJSONsRDD).registerTempTable("student_infos")
// The query score is greater than 80 The basic information of the students
// studentInfoDF.registerTempTable("student_infos")
var sql = "select name,age from student_infos where name in("
for (i <- 0 until goodStudentsNames.length){
sql += "'" + goodStudentsNames(i) + "'"
if (i < goodStudentsNames.length-1){
sql += ","
}
}
sql +=")"
val goodStudentInfosDF = sqlContext.sql(sql)
// Make the score greater than 80 Score students' grades and basic information join
val goodStudentRDD = goodStudentInfosDF.rdd.map{row =>(row.getAs[String]("name"),
row.getAs[Long]("score"))}.join(goodStudentInfosDF.rdd.map{
row =>(row.getAs[String]("name"),row.getAs[Long]("age"))
})
// take RDD Convert to DF
val goodStudentRowsRDD = goodStudentRDD.map(
info => Row(info._1,info._2._1.toInt,info._2._2.toInt)
)
val structField = StructType(Array(
StructField("name",StringType,true),
StructField("score",IntegerType,true),
StructField("age",IntegerType,true)))
val goodStudentDF = sqlContext.createDataFrame(goodStudentRowsRDD,structField)
// take DF The data in is saved in json in
goodStudentDF.write.format("json").save("hdfs://master:9000/root/spark_study/scala/good-students-scala")
}
}
边栏推荐
- Mark down learning
- Dc-1-practice
- Wechat H5 record
- Color space (1) - RGB
- Vulntarget vulnerability shooting range -vulntarget-b
- Solve the error: could not find 'xxxtest‘
- Hyperchain hyperblockchain Shi Xingguo was interviewed by 36 krypton: the amount of customer cooperation consulting is doubling
- Use unicloud cloud function to decode wechat motion steps in applet
- If there is a segment in the encryption field, are you "bronze" or "King"?
- [jailhouse article] scheduling policies and system software architectures for mixed criticality
猜你喜欢

Use of stm32cubemonitor part I - data plotting and instrument display
![[pyGame practice] nostalgic classic - do you remember the name of this chess game for children? (must collect)](/img/b3/075ad2d555118272efede5a9969319.png)
[pyGame practice] nostalgic classic - do you remember the name of this chess game for children? (must collect)

Selenium framework operation steelth.min.js file hides browser fingerprint features

Tp5.1 initialize initialization method (not \u initialize)

Riotboard development board series notes (VII) -- the use of framebuffer

SQL Server 2022 installation

JS foundation -- regular expression

Wechat sports field reservation of applet completion works applet graduation design (8) graduation design thesis template

JS written test question -- deep copy of object

Use of stm32cubemonitor Part II - historical data storage and network access
随机推荐
IO (1) -io layering
Simulation Implementation of string function (Part 1)
[pyGame practice] nostalgic classic - do you remember the name of this chess game for children? (must collect)
Keil compile download error: no algorithm found for: 08000000h - 08001233h solution
JS written test questions -- random numbers, array de duplication
Learning record Xi
Learning record 10
Arduino + si5351 square wave generator
Use of stm32cubemonitor part I - data plotting and instrument display
Color space (1) - RGB
Matlab for circular pit
Strategy mode, just read one article
Publish the project online and don't want to open a port
Several dpdk control frameworks
Time formatting
Pypi counts the number of Downloads
"Introduction to interface testing" punch in day08: can you save all parameters to excel for test data?
Domain driven model (DDD)
Beginners must see the markdown User Guide
JS construction linked list