当前位置:网站首页>Learn about spark project on Nebula graph
Learn about spark project on Nebula graph
2022-07-25 08:55:00 【InfoQ】
Nebula Graph One of the three Spark subprojects
- Nebula Spark Connector It's a Spark Lib, It can make Spark The application is able to
dataframe The form from Nebula Graph Read and write graph data .
- Nebula Exchange Based on the Nebula Spark Connector above , As a Spark Lib At the same time, it can be directly Spark Submit JAR Applications executed by packages , Its design goal is to Nebula Graph Exchange different data sources ( For open source versions , It's one-way : write in , For the enterprise version , It's two-way ).Nebula Exchange Many different types of data sources are supported, such as :MySQL、Neo4j、PostgreSQL、ClickHouse、Hive etc. . In addition to writing directly Nebula Graph, It can also choose to generate SST file , And inject it into Nebula Graph, For use Nebula Graph Computing power outside the cluster helps sort the bottom layer .
- Nebula Algorithm, Based on the Nebula Spark Connector and GraphX above , Also a Spark Lib and Spark On the application , It is used in Nebula Graph Run common graph algorithms on the graph of (pagerank,LPA etc. ).
Nebula Spark Connector
- Code :https://github.com/vesoft-inc/nebula-spark-connector
- file :https://docs.nebula-graph.io/3.1.0/nebula-spark-connector/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/
- Code example :example
Nebula Graph Spark Reader
playerwithLabel("player")withReturnCols(List("name", "age"))spark.read.nebula.loadVerticesToDF def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
Get started Nebula Spark Connector
Pull up the environment
# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"
# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# Wait a minute or so
# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
Get into Spark Environmental Science
docker exec -it spark_master_1 bash
mvndocker exec -it spark_master_1 bash
# in the container shell
export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
run Spark Connector Example
Options 1( recommend ): adopt PySpark
- Get into PySpark Shell
~/.nebula-up/nebula-pyspark.sh
- call Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# show the dataframe with limit of 2
df.show(n=2)
- Return result example
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
... "com.vesoft.nebula.connector.NebulaDataSource").option(
... "type", "vertex").option(
... "spaceName", "basketballplayer").option(
... "label", "player").option(
... "returnCols", "name,age").option(
... "metaAddress", "metad0:9559").option(
... "partitionNumber", 1).load()
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
Options 2: compile 、 Submit sample JAR package
- Clone first Spark Connector And the code warehouse of its sample code , Then compile :
README.mdcd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git
docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
- Replace the code of the sample project
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
- Paste the following code , Here, we'll compare the figure loaded in front :
basketballplayer Read vertices and edges on : Respectively called readVertex and readEdges.
package com.vesoft.nebula.examples.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkReaderExample {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
readVertex(spark)
readEdges(spark)
spark.close()
sys.exit()
}
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("follow")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
edge.printSchema()
edge.show(20)
println("edge count: " + edge.count())
}
}
- And then it's packaged into JAR package
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
- Last , Submit it to Spark Internal execution :
cd example
/spark/bin/spark-submit --master "local" \
--class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
--driver-memory 4g target/example-3.0-SNAPSHOT.jar
# sign out spark Containers
exit
- After success , We will get the return result :
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId| name|age|
+---------+------------------+---+
|player105| Danny Green| 31|
|player109| Tiago Splitter| 34|
|player111| David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114| Tracy McGrady| 39|
|player150| Luka Doncic| 20|
|player103| Rudy Gay| 32|
|player113| Dejounte Murray| 29|
|player121| Chris Paul| 33|
|player128| Carmelo Anthony| 34|
|player130| Joel Embiid| 25|
|player136| Steve Nash| 45|
|player108| Boris Diaw| 36|
|player122| DeAndre Jordan| 30|
|player123| Ricky Rubio| 28|
|player139| Marc Gasol| 34|
|player142| Klay Thompson| 29|
|player145| JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows
22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
| _srcId| _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100| 0| 70|
|player105|player104| 0| 83|
|player105|player116| 0| 80|
|player109|player100| 0| 80|
|player109|player125| 0| 90|
|player118|player120| 0| 90|
|player118|player131| 0| 90|
|player143|player150| 0| 90|
|player114|player103| 0| 90|
|player114|player115| 0| 90|
|player114|player140| 0| 90|
|player150|player120| 0| 80|
|player150|player137| 0| 90|
|player150|player143| 0| 90|
|player103|player102| 0| 70|
|player113|player100| 0| 99|
|player113|player101| 0| 99|
|player113|player104| 0| 99|
|player113|player105| 0| 99|
|player113|player106| 0| 99|
+---------+---------+-----+------+
only showing top 20 rows
Nebula Exchange
- Code :https://github.com/vesoft-inc/nebula-exchange/
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/
- JAR package :https://github.com/vesoft-inc/nebula-exchange/releases
- Configuration example : exchange-common/src/test/resources/application.conf
- First create the configuration file , Give Way Exchange Know how to get and write data
- Then call with the specified configuration file Exchange package
One click trial Exchange
Run up and have a look first
~/.nebula-up/nebula-exchange-example.sh
Look at some details
player800,"Foo Bar",23
player801,"Another Name",21
- We can enter Spark Look in the environment
docker exec -it spark_master_1 bash
cd /root
- You can see that we submitted Exchange The configuration file specified during the task
exchange.conf It's a HOCON File format : stay .nebula Described in Nebula Graph The relevant information of the cluster is in .tags How to map mandatory fields to our data source is described in ( Here is CSV file ) Etc Vertecies Information about .
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# HDFS csv
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
- We should be able to see that CSV The data source and the configuration file are in the same directory :
bash-5.0# ls -l
total 24
drwxrwxr-x 2 1000 1000 4096 Jun 1 04:26 download
-rw-rw-r-- 1 1000 1000 1908 Jun 1 04:23 exchange.conf
-rw-rw-r-- 1 1000 1000 2593 Jun 1 04:23 hadoop.env
drwxrwxr-x 7 1000 1000 4096 Jun 6 03:27 nebula-spark-connector
-rw-rw-r-- 1 1000 1000 51 Jun 1 04:23 player.csv
- then , In fact, we can manually submit this again Exchange Mission
/spark/bin/spark-submit --master local \
--class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
-c exchange.conf
- Partial return results
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...
Nebula Algorithm
- Code warehouse : https://github.com/vesoft-inc/nebula-algorithm
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-algorithm/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/
- Sample code :example/src/main/scala/com/vesoft/nebula/algorithm
adopt spark-submit Submit tasks
- load LiveJournal Data sets
~/.nebula-up/load-LiveJournal-dataset.sh
- stay LiveJournal Execute a PageRank Algorithm , Results output to CSV In file
~/.nebula-up/nebula-algo-pagerank-example.sh
- Check the output :
docker exec -it spark_master_1 bash
head /output/part*000.csv
_id,pagerank
637100,0.9268620883822242
108150,1.1855749056722755
957460,0.923720299211093
257320,0.9967932799358413
Profile interpretation
.data The specified source is Nebula, Indicates getting graph data from the cluster , Outputsinkyes csv, Write to local file .
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: false
}
.nebula.read Stipulated reading Nebula Graph The correspondence between clusters , Here is reading all edge type: follow The edge data of is a whole graph
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "metad0:9559"
# Nebula space
space: livejournal
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["follow"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: []
}
.algorithm The algorithm we want to call is configured in , And algorithm configuration
algorithm: {
executeAlgo: pagerank
# PageRank parameter
pagerank: {
maxIter: 10
resetProb: 0.15 # default 0.15
}
As a library in Spark Call in Nebula Algoritm
- Have more control over the output format of the algorithm / Custom features
- Can be non numeric ID The situation of , seehere
边栏推荐
- Redis/Mysql知识概述
- [NPM] the "NPM" item cannot be recognized as the name of cmdlets, functions, script files or runnable programs. Please check the spelling of the name. If the path is included, make sure the path is co
- Wechat applet ordering system graduation design of applet completion works (8) graduation design thesis template
- Freemaker template engine
- API parsing of JDBC
- uniapp中scroll-view的坑
- How does Flink SQL persist?
- 提高代码可续性的小技巧,以connectTo方法为例。
- JS touch screen game source code ice and snow journey
- Arcgis10.2 installation tutorial
猜你喜欢

PHP reports an error: classes\phpexcel\cell php Line(594) Invalid cell coordinate ESIGN1

Solve the syntaxerror: unexpected end of JSON input

Overview of redis/mysql knowledge

Centernet network structure construction

Wechat reservation applet graduation design of applet completion works (2) applet function

Foundation 31: Selenium positioning dynamic ID element

Wechat reservation of completed works of applet graduation project (4) opening report

PL/SQL工具导出sql文件所使用的命令是什么?

Basis 33: XPath acquisition methods of page elements under various browsers

How to connect tdengine with idea database tool?
随机推荐
51单片机内部外设:定时器和计数器
这家十年内容产业基建公司,竟是隐形的Web3先行者
Graduation project of wechat small program ordering system of small program completion works (7) Interim inspection report
Record the process of two multi terminal troubleshooting
图解LeetCode——1184. 公交站间的距离(难度:简单)
Wechat applet ordering system graduation design of applet completion works (2) applet function
Unity client reading text configuration
51 MCU peripherals: Motor
Yolov5 environment configuration
Wechat reservation of completed works of applet graduation project (4) opening report
Wechat reservation of the completed works of the applet graduation project (6) opening defense ppt
Wechat sports ground reservation applet graduation design of applet completion works (2) applet function
Redis学习笔记
艺术 NFT 的发展之路
Rich text style word image processing
360度拖拽全景图插件tpanorama.js
[STL]stack&queue模拟实现
Graduation project of wechat small program ordering system of small program completion works (6) opening defense ppt
Additional: SQL statement area / county in the middle half (data table)
This week's big news | FCC exposed Pico 4 VR all-in-one machine, and leipeng's parent company established a smart glasses laboratory