当前位置:网站首页>Use go language to import Doris data through stream load
Use go language to import Doris data through stream load
2022-07-25 03:14:00 【Zhangjiafeng】
This article uses GO yes 1.17.2
Doris 0.15.0 release edition
Doris The data import of is available in various languages , however GO The language version is basically invisible , I learned it briefly , I wrote a simple one Stream Load Example of warehousing , For reference only
The table structure used in the example :
CREATE TABLE IF NOT EXISTS user_info( user_id LARGEINT NOT NULL COMMENT " user id", username varchar(50) NOT NULL COMMENT " user name ", city VARCHAR(20) COMMENT " User City ", age SMALLINT COMMENT " User age ", sex TINYINT COMMENT " User's gender ", phone LARGEINT COMMENT " Telephone ", address VARCHAR(500) COMMENT " Address ", register_time datetime COMMENT " User registration time ")Unique KEY(user_id, username)DISTRIBUTED BY HASH(user_id) BUCKETS 3PROPERTIES ("replication_num" = "3");
Here is GO Example code for , It supports importing from files , Import from memory data , It also provides access BE Method of node list , You can get one randomly from here when importing BE node IP And port , Direct connection BE Import
package mainimport ( "container/list" "encoding/base64" "encoding/json" "fmt" "github.com/gofrs/uuid" "io/ioutil" "log" "net/http" "strconv" "strings")type StreamLoad struct { url string dbName string tableName string data string userName string password string}// Realization Doris User authentication information func auth(load StreamLoad) string { s := load.userName + ":" + load.password b := []byte(s) sEnc := base64.StdEncoding.EncodeToString(b) fmt.Printf("enc=[%s]\n", sEnc) sDec, err := base64.StdEncoding.DecodeString(sEnc) if err != nil { fmt.Printf("base64 decode failure, error=[%v]\n", err) } else { fmt.Printf("dec=[%s]\n", sDec) } return sEnc}// Use Stream load Import file data into Doris In the corresponding data table func batch_load_file(load StreamLoad, file string) { client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load" //fmt.Formatter(.Format(url,load.dbName,l)) fileContext, err := ioutil.ReadFile(file) if err != nil { log.Println("Failed to Read the File", file, err) } record := strings.NewReader(string(fileContext)) // Submit a request reqest, err := http.NewRequest(http.MethodPut, url, record) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) reqest.Header.Add("EXPECT", "100-continue") var u1 = uuid.Must(uuid.NewV4()) reqest.Header.Add("label", u1.String()) reqest.Header.Add("column_separator", ",") if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) responseBody := ResponseBody{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &responseBody) if err != nil { fmt.Println(err.Error()) } if responseBody.Status == "Success" { // If there is filtered data , Print wrong URL if responseBody.NumberFilteredRows > 0 { fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } else { fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows) } } fmt.Println(string(body)) } defer response.Body.Close()}// Memory stream data , adopt Stream Load Import Doris In the table func batch_load_data(load StreamLoad, data string) { client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load" //fmt.Formatter(.Format(url,load.dbName,l)) record := strings.NewReader(data) // Submit a request reqest, err := http.NewRequest(http.MethodPut, url, record) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) reqest.Header.Add("EXPECT", "100-continue") var u1 = uuid.Must(uuid.NewV4()) reqest.Header.Add("label", u1.String()) reqest.Header.Add("column_separator", ",") if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) responseBody := ResponseBody{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &responseBody) if err != nil { fmt.Println(err.Error()) } if responseBody.Status == "Success" { // If there is filtered data , Print wrong URL if responseBody.NumberFilteredRows > 0 { fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } else { fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows) } } else { fmt.Printf("Error Message : %s \n", responseBody.Message) fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } //fmt.Println(jsonStr) } defer response.Body.Close()}// obtain BE list func get_doris_be_list() *list.List { var load StreamLoad load.userName = "root" load.password = "" client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/backends?is_alive=true" // Submit a request reqest, err := http.NewRequest("GET", url, nil) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) bes := list.New() if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) backends := Backend{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &backends) if err != nil { fmt.Println(err.Error()) } for _, beinfo := range backends.Data.Backends { be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort) bes.PushBack(be) } } defer response.Body.Close() return bes}//Stream load Return the message structure type ResponseBody struct { TxnID int `json:"TxnId"` Label string `json:"Label"` Status string `json:"Status"` Message string `json:"Message"` NumberTotalRows int `json:"NumberTotalRows"` NumberLoadedRows int `json:"NumberLoadedRows"` NumberFilteredRows int `json:"NumberFilteredRows"` NumberUnselectedRows int `json:"NumberUnselectedRows"` LoadBytes int `json:"LoadBytes"` LoadTimeMs int `json:"LoadTimeMs"` BeginTxnTimeMs int `json:"BeginTxnTimeMs"` StreamLoadPutTimeMs int `json:"StreamLoadPutTimeMs"` ReadDataTimeMs int `json:"ReadDataTimeMs"` WriteDataTimeMs int `json:"WriteDataTimeMs"` CommitAndPublishTimeMs int `json:"CommitAndPublishTimeMs"` ErrorURL string `json:"ErrorURL"`}// obtain BE The list returns the structure type Backend struct { Msg string `json:"msg"` Code int `json:"code"` Data struct { Backends []struct { IP string `json:"ip"` HTTPPort int `json:"http_port"` IsAlive bool `json:"is_alive"` } `json:"backends"` } `json:"data"` Count int `json:"count"`}func main() { var load StreamLoad load.userName = "root" load.password = "" //auth_info := auth(load) //fmt.Println(auth_info) //backends := get_doris_be_list() //for e := backends.Front(); e != nil; e = e.Next() { // fmt.Println(e.Value) //} data := "10001, Zhang ***, Xi'an ,30,1,133****760, Shaanxi Province **********,2021-03-12 12:34:12" batch_load_data(load, data) //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")}
边栏推荐
- mysql_ Master slave synchronization_ Show slave status details
- [stm32f103rct6] can communication
- Wechat sports field reservation of the finished works of the applet graduation project (6) opening defense ppt
- Openlayers ol ext: Transform object, rotate, stretch, zoom in
- Learning record XIII
- Openlayers draw deletes the last point when drawing
- Dynamic planning of force buckle punch in summary
- Clothing ERP | ten advantages of clothing ERP for enterprises
- JS written test question -- prototype, new, this comprehensive question
- Backtracking to solve subset problem
猜你喜欢

Use of stm32cubemonitor Part II - historical data storage and network access

Wechat sports field reservation of applet completion works applet graduation design (8) graduation design thesis template

JS foundation -- data

PHP record

Consistent hash, virtual node, bloom filter

Riotboard development board series notes (VIII) -- building desktop system

Learning notes - talking about the data structure and algorithm of MySQL index and the introduction of index

Wechat sports field reservation of the finished works of the applet graduation project (7) mid-term inspection report

Define macros in makefile and pass them to source code

Keil compile download error: no algorithm found for: 08000000h - 08001233h solution
随机推荐
Recursive and non recursive methods are used to realize the first order, middle order and second order traversal of binary tree respectively
Openlayers draw circles and ellipses
Bubble sort / heap sort
Riotboard development board series notes (V) -- porting u-boot
How to use two stacks to simulate the implementation of a queue
Threat report in June: new bank malware malibot poses a threat to mobile banking users
Dc-2-range practice
JS interview question - what is the difference between Es5 and ES6?
Can bus baud rate setting of stm32cubemx
The difference between abstract classes and interfaces
MySQL configuration in CDH installation
DOM node type
Error: tomee required to support ear/ejb deployment
Decoding webp static pictures using libwebp
hello csdn
Define macros in makefile and pass them to source code
Backtracking to solve combinatorial problems
Daily three questions 7.19
Print the common part of two ordered linked lists
Arduino + si5351 square wave generator