当前位置:网站首页>Learn about spark project on Nebula graph
Learn about spark project on Nebula graph
2022-07-25 08:55:00 【InfoQ】
Nebula Graph One of the three Spark subprojects
- Nebula Spark Connector It's a Spark Lib, It can make Spark The application is able to
dataframe The form from Nebula Graph Read and write graph data .
- Nebula Exchange Based on the Nebula Spark Connector above , As a Spark Lib At the same time, it can be directly Spark Submit JAR Applications executed by packages , Its design goal is to Nebula Graph Exchange different data sources ( For open source versions , It's one-way : write in , For the enterprise version , It's two-way ).Nebula Exchange Many different types of data sources are supported, such as :MySQL、Neo4j、PostgreSQL、ClickHouse、Hive etc. . In addition to writing directly Nebula Graph, It can also choose to generate SST file , And inject it into Nebula Graph, For use Nebula Graph Computing power outside the cluster helps sort the bottom layer .
- Nebula Algorithm, Based on the Nebula Spark Connector and GraphX above , Also a Spark Lib and Spark On the application , It is used in Nebula Graph Run common graph algorithms on the graph of (pagerank,LPA etc. ).
Nebula Spark Connector
- Code :https://github.com/vesoft-inc/nebula-spark-connector
- file :https://docs.nebula-graph.io/3.1.0/nebula-spark-connector/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/
- Code example :example
Nebula Graph Spark Reader
playerwithLabel("player")withReturnCols(List("name", "age"))spark.read.nebula.loadVerticesToDF def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
Get started Nebula Spark Connector
Pull up the environment
# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"
# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# Wait a minute or so
# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
Get into Spark Environmental Science
docker exec -it spark_master_1 bash
mvndocker exec -it spark_master_1 bash
# in the container shell
export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
run Spark Connector Example
Options 1( recommend ): adopt PySpark
- Get into PySpark Shell
~/.nebula-up/nebula-pyspark.sh
- call Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# show the dataframe with limit of 2
df.show(n=2)
- Return result example
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
... "com.vesoft.nebula.connector.NebulaDataSource").option(
... "type", "vertex").option(
... "spaceName", "basketballplayer").option(
... "label", "player").option(
... "returnCols", "name,age").option(
... "metaAddress", "metad0:9559").option(
... "partitionNumber", 1).load()
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
Options 2: compile 、 Submit sample JAR package
- Clone first Spark Connector And the code warehouse of its sample code , Then compile :
README.mdcd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git
docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
- Replace the code of the sample project
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
- Paste the following code , Here, we'll compare the figure loaded in front :
basketballplayer Read vertices and edges on : Respectively called readVertex and readEdges.
package com.vesoft.nebula.examples.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkReaderExample {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
readVertex(spark)
readEdges(spark)
spark.close()
sys.exit()
}
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("follow")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
edge.printSchema()
edge.show(20)
println("edge count: " + edge.count())
}
}
- And then it's packaged into JAR package
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
- Last , Submit it to Spark Internal execution :
cd example
/spark/bin/spark-submit --master "local" \
--class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
--driver-memory 4g target/example-3.0-SNAPSHOT.jar
# sign out spark Containers
exit
- After success , We will get the return result :
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId| name|age|
+---------+------------------+---+
|player105| Danny Green| 31|
|player109| Tiago Splitter| 34|
|player111| David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114| Tracy McGrady| 39|
|player150| Luka Doncic| 20|
|player103| Rudy Gay| 32|
|player113| Dejounte Murray| 29|
|player121| Chris Paul| 33|
|player128| Carmelo Anthony| 34|
|player130| Joel Embiid| 25|
|player136| Steve Nash| 45|
|player108| Boris Diaw| 36|
|player122| DeAndre Jordan| 30|
|player123| Ricky Rubio| 28|
|player139| Marc Gasol| 34|
|player142| Klay Thompson| 29|
|player145| JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows
22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
| _srcId| _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100| 0| 70|
|player105|player104| 0| 83|
|player105|player116| 0| 80|
|player109|player100| 0| 80|
|player109|player125| 0| 90|
|player118|player120| 0| 90|
|player118|player131| 0| 90|
|player143|player150| 0| 90|
|player114|player103| 0| 90|
|player114|player115| 0| 90|
|player114|player140| 0| 90|
|player150|player120| 0| 80|
|player150|player137| 0| 90|
|player150|player143| 0| 90|
|player103|player102| 0| 70|
|player113|player100| 0| 99|
|player113|player101| 0| 99|
|player113|player104| 0| 99|
|player113|player105| 0| 99|
|player113|player106| 0| 99|
+---------+---------+-----+------+
only showing top 20 rows
Nebula Exchange
- Code :https://github.com/vesoft-inc/nebula-exchange/
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/
- JAR package :https://github.com/vesoft-inc/nebula-exchange/releases
- Configuration example : exchange-common/src/test/resources/application.conf
- First create the configuration file , Give Way Exchange Know how to get and write data
- Then call with the specified configuration file Exchange package
One click trial Exchange
Run up and have a look first
~/.nebula-up/nebula-exchange-example.sh
Look at some details
player800,"Foo Bar",23
player801,"Another Name",21
- We can enter Spark Look in the environment
docker exec -it spark_master_1 bash
cd /root
- You can see that we submitted Exchange The configuration file specified during the task
exchange.conf It's a HOCON File format : stay .nebula Described in Nebula Graph The relevant information of the cluster is in .tags How to map mandatory fields to our data source is described in ( Here is CSV file ) Etc Vertecies Information about .
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# HDFS csv
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
- We should be able to see that CSV The data source and the configuration file are in the same directory :
bash-5.0# ls -l
total 24
drwxrwxr-x 2 1000 1000 4096 Jun 1 04:26 download
-rw-rw-r-- 1 1000 1000 1908 Jun 1 04:23 exchange.conf
-rw-rw-r-- 1 1000 1000 2593 Jun 1 04:23 hadoop.env
drwxrwxr-x 7 1000 1000 4096 Jun 6 03:27 nebula-spark-connector
-rw-rw-r-- 1 1000 1000 51 Jun 1 04:23 player.csv
- then , In fact, we can manually submit this again Exchange Mission
/spark/bin/spark-submit --master local \
--class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
-c exchange.conf
- Partial return results
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...
Nebula Algorithm
- Code warehouse : https://github.com/vesoft-inc/nebula-algorithm
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-algorithm/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/
- Sample code :example/src/main/scala/com/vesoft/nebula/algorithm
adopt spark-submit Submit tasks
- load LiveJournal Data sets
~/.nebula-up/load-LiveJournal-dataset.sh
- stay LiveJournal Execute a PageRank Algorithm , Results output to CSV In file
~/.nebula-up/nebula-algo-pagerank-example.sh
- Check the output :
docker exec -it spark_master_1 bash
head /output/part*000.csv
_id,pagerank
637100,0.9268620883822242
108150,1.1855749056722755
957460,0.923720299211093
257320,0.9967932799358413
Profile interpretation
.data The specified source is Nebula, Indicates getting graph data from the cluster , Outputsinkyes csv, Write to local file .
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: false
}
.nebula.read Stipulated reading Nebula Graph The correspondence between clusters , Here is reading all edge type: follow The edge data of is a whole graph
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "metad0:9559"
# Nebula space
space: livejournal
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["follow"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: []
}
.algorithm The algorithm we want to call is configured in , And algorithm configuration
algorithm: {
executeAlgo: pagerank
# PageRank parameter
pagerank: {
maxIter: 10
resetProb: 0.15 # default 0.15
}
As a library in Spark Call in Nebula Algoritm
- Have more control over the output format of the algorithm / Custom features
- Can be non numeric ID The situation of , seehere
边栏推荐
- 51 MCU peripherals: buzzer
- unity客户端读取文本配置
- JD cloud and Forrester consulting released a hybrid cloud report that cloud Nativity has become a new engine driving industrial development
- 25位撤销博士学位
- Graduation project of wechat small program ordering system of small program completion works (6) opening defense ppt
- js弹出式城市筛选组件匹配手机移动端
- Technical aspect ② what are the index types in MySQL and briefly introduce them? When do I need to create an index? When is it not necessary to create an index? Why does the query speed increase after
- When testing VPN, the IP found by the command line is inconsistent with that of Baidu search
- Foundation 32: page element positioning method XPath --- axis positioning method
- js小游戏源码魔塔闯关下载
猜你喜欢

Recursive call to print every bit of an integer
![[NPM] the](/img/ae/efccefae0323a1f6a425523e01d2ac.png)
[NPM] the "NPM" item cannot be recognized as the name of cmdlets, functions, script files or runnable programs. Please check the spelling of the name. If the path is included, make sure the path is co

js触屏小游戏源码冰雪之旅

Wechat reservation applet graduation design of applet completion works (2) applet function

Source code of short video live broadcast system
![[Sesame Street family] & Bert Bart Roberta](/img/ff/c685065cd413bd4cffd996fd9afeaa.png)
[Sesame Street family] & Bert Bart Roberta

The simplest sklearn environment configuration tutorial in the whole network (100% success)

Does the server operation and maintenance need to be online 24 hours? Do you need to work overtime on weekends?

图解LeetCode——919. 完全二叉树插入器(难度:中等)

51单片机内部外设:串口通信
随机推荐
Overview of redis/mysql knowledge
JS pop-up City filtering component matches mobile terminal
Foundation 32: page element positioning method XPath --- axis positioning method
360 degree drag panorama plug-in tpanorama.js
JS small game source code magic tower breakthrough Download
Foundation 31: Selenium positioning dynamic ID element
Sticky.js page scrolling div fixed position plug-in
QA robot sequencing model
Canvas dynamic picture avatar shaking JS special effect
Robot jumping problem
Phpexcel reports an error: err_ INVALID_ RESPONSE
Visual query (sp_helptext) -- quick query of stored procedures containing specified strings (with source code)
JDBC快速入门
js弹出式城市筛选组件匹配手机移动端
整理 华为AP-3010DN_V2配置创建wifi
PHP reports an error: classes\phpexcel\cell php Line(594) Invalid cell coordinate ESIGN1
防抖与节流
Read and write models and organize notes
Wechat reservation applet graduation design of applet completion works (2) applet function
Record the process of two multi terminal troubleshooting