当前位置:网站首页>Learn about spark project on Nebula graph
Learn about spark project on Nebula graph
2022-07-25 08:55:00 【InfoQ】
Nebula Graph One of the three Spark subprojects
- Nebula Spark Connector It's a Spark Lib, It can make Spark The application is able to
dataframe The form from Nebula Graph Read and write graph data .
- Nebula Exchange Based on the Nebula Spark Connector above , As a Spark Lib At the same time, it can be directly Spark Submit JAR Applications executed by packages , Its design goal is to Nebula Graph Exchange different data sources ( For open source versions , It's one-way : write in , For the enterprise version , It's two-way ).Nebula Exchange Many different types of data sources are supported, such as :MySQL、Neo4j、PostgreSQL、ClickHouse、Hive etc. . In addition to writing directly Nebula Graph, It can also choose to generate SST file , And inject it into Nebula Graph, For use Nebula Graph Computing power outside the cluster helps sort the bottom layer .
- Nebula Algorithm, Based on the Nebula Spark Connector and GraphX above , Also a Spark Lib and Spark On the application , It is used in Nebula Graph Run common graph algorithms on the graph of (pagerank,LPA etc. ).
Nebula Spark Connector
- Code :https://github.com/vesoft-inc/nebula-spark-connector
- file :https://docs.nebula-graph.io/3.1.0/nebula-spark-connector/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/
- Code example :example
Nebula Graph Spark Reader
playerwithLabel("player")withReturnCols(List("name", "age"))spark.read.nebula.loadVerticesToDF def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
Get started Nebula Spark Connector
Pull up the environment
# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"
# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# Wait a minute or so
# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
Get into Spark Environmental Science
docker exec -it spark_master_1 bash
mvndocker exec -it spark_master_1 bash
# in the container shell
export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
run Spark Connector Example
Options 1( recommend ): adopt PySpark
- Get into PySpark Shell
~/.nebula-up/nebula-pyspark.sh
- call Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# show the dataframe with limit of 2
df.show(n=2)
- Return result example
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
... "com.vesoft.nebula.connector.NebulaDataSource").option(
... "type", "vertex").option(
... "spaceName", "basketballplayer").option(
... "label", "player").option(
... "returnCols", "name,age").option(
... "metaAddress", "metad0:9559").option(
... "partitionNumber", 1).load()
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
Options 2: compile 、 Submit sample JAR package
- Clone first Spark Connector And the code warehouse of its sample code , Then compile :
README.mdcd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git
docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
- Replace the code of the sample project
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
- Paste the following code , Here, we'll compare the figure loaded in front :
basketballplayer Read vertices and edges on : Respectively called readVertex and readEdges.
package com.vesoft.nebula.examples.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkReaderExample {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
readVertex(spark)
readEdges(spark)
spark.close()
sys.exit()
}
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("follow")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
edge.printSchema()
edge.show(20)
println("edge count: " + edge.count())
}
}
- And then it's packaged into JAR package
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
- Last , Submit it to Spark Internal execution :
cd example
/spark/bin/spark-submit --master "local" \
--class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
--driver-memory 4g target/example-3.0-SNAPSHOT.jar
# sign out spark Containers
exit
- After success , We will get the return result :
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId| name|age|
+---------+------------------+---+
|player105| Danny Green| 31|
|player109| Tiago Splitter| 34|
|player111| David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114| Tracy McGrady| 39|
|player150| Luka Doncic| 20|
|player103| Rudy Gay| 32|
|player113| Dejounte Murray| 29|
|player121| Chris Paul| 33|
|player128| Carmelo Anthony| 34|
|player130| Joel Embiid| 25|
|player136| Steve Nash| 45|
|player108| Boris Diaw| 36|
|player122| DeAndre Jordan| 30|
|player123| Ricky Rubio| 28|
|player139| Marc Gasol| 34|
|player142| Klay Thompson| 29|
|player145| JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows
22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
| _srcId| _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100| 0| 70|
|player105|player104| 0| 83|
|player105|player116| 0| 80|
|player109|player100| 0| 80|
|player109|player125| 0| 90|
|player118|player120| 0| 90|
|player118|player131| 0| 90|
|player143|player150| 0| 90|
|player114|player103| 0| 90|
|player114|player115| 0| 90|
|player114|player140| 0| 90|
|player150|player120| 0| 80|
|player150|player137| 0| 90|
|player150|player143| 0| 90|
|player103|player102| 0| 70|
|player113|player100| 0| 99|
|player113|player101| 0| 99|
|player113|player104| 0| 99|
|player113|player105| 0| 99|
|player113|player106| 0| 99|
+---------+---------+-----+------+
only showing top 20 rows
Nebula Exchange
- Code :https://github.com/vesoft-inc/nebula-exchange/
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/
- JAR package :https://github.com/vesoft-inc/nebula-exchange/releases
- Configuration example : exchange-common/src/test/resources/application.conf
- First create the configuration file , Give Way Exchange Know how to get and write data
- Then call with the specified configuration file Exchange package
One click trial Exchange
Run up and have a look first
~/.nebula-up/nebula-exchange-example.sh
Look at some details
player800,"Foo Bar",23
player801,"Another Name",21
- We can enter Spark Look in the environment
docker exec -it spark_master_1 bash
cd /root
- You can see that we submitted Exchange The configuration file specified during the task
exchange.conf It's a HOCON File format : stay .nebula Described in Nebula Graph The relevant information of the cluster is in .tags How to map mandatory fields to our data source is described in ( Here is CSV file ) Etc Vertecies Information about .
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# HDFS csv
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
- We should be able to see that CSV The data source and the configuration file are in the same directory :
bash-5.0# ls -l
total 24
drwxrwxr-x 2 1000 1000 4096 Jun 1 04:26 download
-rw-rw-r-- 1 1000 1000 1908 Jun 1 04:23 exchange.conf
-rw-rw-r-- 1 1000 1000 2593 Jun 1 04:23 hadoop.env
drwxrwxr-x 7 1000 1000 4096 Jun 6 03:27 nebula-spark-connector
-rw-rw-r-- 1 1000 1000 51 Jun 1 04:23 player.csv
- then , In fact, we can manually submit this again Exchange Mission
/spark/bin/spark-submit --master local \
--class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
-c exchange.conf
- Partial return results
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...
Nebula Algorithm
- Code warehouse : https://github.com/vesoft-inc/nebula-algorithm
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-algorithm/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/
- Sample code :example/src/main/scala/com/vesoft/nebula/algorithm
adopt spark-submit Submit tasks
- load LiveJournal Data sets
~/.nebula-up/load-LiveJournal-dataset.sh
- stay LiveJournal Execute a PageRank Algorithm , Results output to CSV In file
~/.nebula-up/nebula-algo-pagerank-example.sh
- Check the output :
docker exec -it spark_master_1 bash
head /output/part*000.csv
_id,pagerank
637100,0.9268620883822242
108150,1.1855749056722755
957460,0.923720299211093
257320,0.9967932799358413
Profile interpretation
.data The specified source is Nebula, Indicates getting graph data from the cluster , Outputsinkyes csv, Write to local file .
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: false
}
.nebula.read Stipulated reading Nebula Graph The correspondence between clusters , Here is reading all edge type: follow The edge data of is a whole graph
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "metad0:9559"
# Nebula space
space: livejournal
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["follow"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: []
}
.algorithm The algorithm we want to call is configured in , And algorithm configuration
algorithm: {
executeAlgo: pagerank
# PageRank parameter
pagerank: {
maxIter: 10
resetProb: 0.15 # default 0.15
}
As a library in Spark Call in Nebula Algoritm
- Have more control over the output format of the algorithm / Custom features
- Can be non numeric ID The situation of , seehere
边栏推荐
- Additional: in the lower division / county (data sheet)
- canvas动态图片头像晃动js特效
- Asp. Net core CMD common instructions
- Wechat reservation applet graduation design of applet completion works (2) applet function
- 51 single chip microcomputer controls nixie tube display
- Fundamentals of C language
- [hero planet July training leetcode problem solving daily] 19th binary tree
- 360度拖拽全景图插件tpanorama.js
- Table table expansion internal row switching effect
- [Sesame Street family] & Bert Bart Roberta
猜你喜欢

Overview of redis/mysql knowledge

PL/SQL工具导出sql文件所使用的命令是什么?

JS small game source code magic tower breakthrough Download

LeetCode·83双周赛·6129.全0子数组的数目·数学

这家十年内容产业基建公司,竟是隐形的Web3先行者

Basis 33: XPath acquisition methods of page elements under various browsers

This is the worst controller layer code I've ever seen

sticksy.js页面滚动div固定位置插件

游戏外挂怎么做?

JS touch screen game source code ice and snow journey
随机推荐
Solving a random number problem
全网最简约的sklearn环境配置教程(百分百成功)
图解LeetCode——919. 完全二叉树插入器(难度:中等)
Force buckle - 1046. Weight of the last stone
51单片机外设篇:电机
艺术 NFT 的发展之路
[graduation project] cinema booking management system based on micro Service Framework
Unity client reading text configuration
Sina Weibo client (4) - set navigation bar theme
Wechat reservation applet graduation design of applet completion works (3) background function
Robot jumping problem
Yolov5 environment configuration
Unity HTC vive use
canvas动态图片头像晃动js特效
PHP reports an error: classes\phpexcel\cell php Line(594) Invalid cell coordinate ESIGN1
Graduation project of wechat small program ordering system of small program completion works (1) development outline
Django4.0 + Web + MySQL5.7 实现简单登录操作
Wechat sports ground reservation applet graduation project of applet completion works (1) development outline
整理 华为AP-3010DN_V2配置创建wifi
Basis 33: XPath acquisition methods of page elements under various browsers