当前位置:网站首页>Learn about spark project on Nebula graph
Learn about spark project on Nebula graph
2022-07-25 08:55:00 【InfoQ】
Nebula Graph One of the three Spark subprojects
- Nebula Spark Connector It's a Spark Lib, It can make Spark The application is able to
dataframe The form from Nebula Graph Read and write graph data .
- Nebula Exchange Based on the Nebula Spark Connector above , As a Spark Lib At the same time, it can be directly Spark Submit JAR Applications executed by packages , Its design goal is to Nebula Graph Exchange different data sources ( For open source versions , It's one-way : write in , For the enterprise version , It's two-way ).Nebula Exchange Many different types of data sources are supported, such as :MySQL、Neo4j、PostgreSQL、ClickHouse、Hive etc. . In addition to writing directly Nebula Graph, It can also choose to generate SST file , And inject it into Nebula Graph, For use Nebula Graph Computing power outside the cluster helps sort the bottom layer .
- Nebula Algorithm, Based on the Nebula Spark Connector and GraphX above , Also a Spark Lib and Spark On the application , It is used in Nebula Graph Run common graph algorithms on the graph of (pagerank,LPA etc. ).
Nebula Spark Connector
- Code :https://github.com/vesoft-inc/nebula-spark-connector
- file :https://docs.nebula-graph.io/3.1.0/nebula-spark-connector/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/
- Code example :example
Nebula Graph Spark Reader
playerwithLabel("player")withReturnCols(List("name", "age"))spark.read.nebula.loadVerticesToDF def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
Get started Nebula Spark Connector
Pull up the environment
# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"
# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# Wait a minute or so
# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
Get into Spark Environmental Science
docker exec -it spark_master_1 bash
mvndocker exec -it spark_master_1 bash
# in the container shell
export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
run Spark Connector Example
Options 1( recommend ): adopt PySpark
- Get into PySpark Shell
~/.nebula-up/nebula-pyspark.sh
- call Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# show the dataframe with limit of 2
df.show(n=2)
- Return result example
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
... "com.vesoft.nebula.connector.NebulaDataSource").option(
... "type", "vertex").option(
... "spaceName", "basketballplayer").option(
... "label", "player").option(
... "returnCols", "name,age").option(
... "metaAddress", "metad0:9559").option(
... "partitionNumber", 1).load()
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
Options 2: compile 、 Submit sample JAR package
- Clone first Spark Connector And the code warehouse of its sample code , Then compile :
README.mdcd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git
docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
- Replace the code of the sample project
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
- Paste the following code , Here, we'll compare the figure loaded in front :
basketballplayer Read vertices and edges on : Respectively called readVertex and readEdges.
package com.vesoft.nebula.examples.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkReaderExample {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
readVertex(spark)
readEdges(spark)
spark.close()
sys.exit()
}
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("follow")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
edge.printSchema()
edge.show(20)
println("edge count: " + edge.count())
}
}
- And then it's packaged into JAR package
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
- Last , Submit it to Spark Internal execution :
cd example
/spark/bin/spark-submit --master "local" \
--class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
--driver-memory 4g target/example-3.0-SNAPSHOT.jar
# sign out spark Containers
exit
- After success , We will get the return result :
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId| name|age|
+---------+------------------+---+
|player105| Danny Green| 31|
|player109| Tiago Splitter| 34|
|player111| David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114| Tracy McGrady| 39|
|player150| Luka Doncic| 20|
|player103| Rudy Gay| 32|
|player113| Dejounte Murray| 29|
|player121| Chris Paul| 33|
|player128| Carmelo Anthony| 34|
|player130| Joel Embiid| 25|
|player136| Steve Nash| 45|
|player108| Boris Diaw| 36|
|player122| DeAndre Jordan| 30|
|player123| Ricky Rubio| 28|
|player139| Marc Gasol| 34|
|player142| Klay Thompson| 29|
|player145| JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows
22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
| _srcId| _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100| 0| 70|
|player105|player104| 0| 83|
|player105|player116| 0| 80|
|player109|player100| 0| 80|
|player109|player125| 0| 90|
|player118|player120| 0| 90|
|player118|player131| 0| 90|
|player143|player150| 0| 90|
|player114|player103| 0| 90|
|player114|player115| 0| 90|
|player114|player140| 0| 90|
|player150|player120| 0| 80|
|player150|player137| 0| 90|
|player150|player143| 0| 90|
|player103|player102| 0| 70|
|player113|player100| 0| 99|
|player113|player101| 0| 99|
|player113|player104| 0| 99|
|player113|player105| 0| 99|
|player113|player106| 0| 99|
+---------+---------+-----+------+
only showing top 20 rows
Nebula Exchange
- Code :https://github.com/vesoft-inc/nebula-exchange/
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/
- JAR package :https://github.com/vesoft-inc/nebula-exchange/releases
- Configuration example : exchange-common/src/test/resources/application.conf
- First create the configuration file , Give Way Exchange Know how to get and write data
- Then call with the specified configuration file Exchange package
One click trial Exchange
Run up and have a look first
~/.nebula-up/nebula-exchange-example.sh
Look at some details
player800,"Foo Bar",23
player801,"Another Name",21
- We can enter Spark Look in the environment
docker exec -it spark_master_1 bash
cd /root
- You can see that we submitted Exchange The configuration file specified during the task
exchange.conf It's a HOCON File format : stay .nebula Described in Nebula Graph The relevant information of the cluster is in .tags How to map mandatory fields to our data source is described in ( Here is CSV file ) Etc Vertecies Information about .
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# HDFS csv
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
- We should be able to see that CSV The data source and the configuration file are in the same directory :
bash-5.0# ls -l
total 24
drwxrwxr-x 2 1000 1000 4096 Jun 1 04:26 download
-rw-rw-r-- 1 1000 1000 1908 Jun 1 04:23 exchange.conf
-rw-rw-r-- 1 1000 1000 2593 Jun 1 04:23 hadoop.env
drwxrwxr-x 7 1000 1000 4096 Jun 6 03:27 nebula-spark-connector
-rw-rw-r-- 1 1000 1000 51 Jun 1 04:23 player.csv
- then , In fact, we can manually submit this again Exchange Mission
/spark/bin/spark-submit --master local \
--class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
-c exchange.conf
- Partial return results
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...
Nebula Algorithm
- Code warehouse : https://github.com/vesoft-inc/nebula-algorithm
- file :https://docs.nebula-graph.com.cn/3.1.0/nebula-algorithm/
- JAR package :https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/
- Sample code :example/src/main/scala/com/vesoft/nebula/algorithm
adopt spark-submit Submit tasks
- load LiveJournal Data sets
~/.nebula-up/load-LiveJournal-dataset.sh
- stay LiveJournal Execute a PageRank Algorithm , Results output to CSV In file
~/.nebula-up/nebula-algo-pagerank-example.sh
- Check the output :
docker exec -it spark_master_1 bash
head /output/part*000.csv
_id,pagerank
637100,0.9268620883822242
108150,1.1855749056722755
957460,0.923720299211093
257320,0.9967932799358413
Profile interpretation
.data The specified source is Nebula, Indicates getting graph data from the cluster , Outputsinkyes csv, Write to local file .
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: false
}
.nebula.read Stipulated reading Nebula Graph The correspondence between clusters , Here is reading all edge type: follow The edge data of is a whole graph
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "metad0:9559"
# Nebula space
space: livejournal
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["follow"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: []
}
.algorithm The algorithm we want to call is configured in , And algorithm configuration
algorithm: {
executeAlgo: pagerank
# PageRank parameter
pagerank: {
maxIter: 10
resetProb: 0.15 # default 0.15
}
As a library in Spark Call in Nebula Algoritm
- Have more control over the output format of the algorithm / Custom features
- Can be non numeric ID The situation of , seehere
边栏推荐
- PL/SQL工具导出sql文件所使用的命令是什么?
- JDBC快速入门
- 酷炫canvas动画冲击波js特效
- Additional: SQL statement area / county in the middle half (data table)
- Wechat reservation applet graduation project (7) mid term inspection report of applet completion works
- Intel apologized to the winners of Xe HPG treasure hunt game for product delay and announced the appearance of the prize
- JS pop-up City filtering component matches mobile terminal
- QA robot sequencing model
- Swift initializer and optional chain
- How to connect tdengine with idea database tool?
猜你喜欢

js弹出式城市筛选组件匹配手机移动端

Basis 33: XPath acquisition methods of page elements under various browsers

全网最简约的sklearn环境配置教程(百分百成功)

Wechat reservation applet graduation design of applet completion works (1) development outline

sticksy.js页面滚动div固定位置插件

图解LeetCode——919. 完全二叉树插入器(难度:中等)

When crontab scheduled task executes jar through script, it encounters a pit where jar package execution is invalid

Graduation project of wechat small program ordering system of small program completion works (1) development outline

Wechat reservation of the completed works of the applet graduation project (6) opening defense ppt

2022-7-14 JMeter simulates the login of different users for pressure test
随机推荐
Wechat sports ground reservation applet graduation design of applet completion works (2) applet function
2022-7-14 JMeter simulates the login of different users for pressure test
51 single chip microcomputer controls nixie tube display
A page widgetization practice
Graduation design of wechat small program ordering system of small program completion works (3) background function
Fundamentals of C language
Rich text style word image processing
canvas动态图片头像晃动js特效
When crontab scheduled task executes jar through script, it encounters a pit where jar package execution is invalid
Deduct one question every day - 2114. The maximum number of words in the sentence
优炫数据库对数据的加密是如何做的?
艺术 NFT 的发展之路
Wechat reservation of the completed works of the applet graduation project (6) opening defense ppt
图解LeetCode——1184. 公交站间的距离(难度:简单)
Talk about your transformation test development process
技术面②Mysql中的索引(index)类型有哪些并简要介绍一下?什么时候需要创建索引?什么时候不需要创建索引?为什么创建索引后查询速度会提高?
提高代码可续性的小技巧,以connectTo方法为例。
JDBC的API解析
CIR industrial automation radar
Sina Weibo client (4) - set navigation bar theme