当前位置:网站首页>Use go language to import Doris data through stream load
Use go language to import Doris data through stream load
2022-07-25 03:14:00 【Zhangjiafeng】
This article uses GO yes 1.17.2
Doris 0.15.0 release edition
Doris The data import of is available in various languages , however GO The language version is basically invisible , I learned it briefly , I wrote a simple one Stream Load Example of warehousing , For reference only
The table structure used in the example :
CREATE TABLE IF NOT EXISTS user_info( user_id LARGEINT NOT NULL COMMENT " user id", username varchar(50) NOT NULL COMMENT " user name ", city VARCHAR(20) COMMENT " User City ", age SMALLINT COMMENT " User age ", sex TINYINT COMMENT " User's gender ", phone LARGEINT COMMENT " Telephone ", address VARCHAR(500) COMMENT " Address ", register_time datetime COMMENT " User registration time ")Unique KEY(user_id, username)DISTRIBUTED BY HASH(user_id) BUCKETS 3PROPERTIES ("replication_num" = "3");
Here is GO Example code for , It supports importing from files , Import from memory data , It also provides access BE Method of node list , You can get one randomly from here when importing BE node IP And port , Direct connection BE Import
package mainimport ( "container/list" "encoding/base64" "encoding/json" "fmt" "github.com/gofrs/uuid" "io/ioutil" "log" "net/http" "strconv" "strings")type StreamLoad struct { url string dbName string tableName string data string userName string password string}// Realization Doris User authentication information func auth(load StreamLoad) string { s := load.userName + ":" + load.password b := []byte(s) sEnc := base64.StdEncoding.EncodeToString(b) fmt.Printf("enc=[%s]\n", sEnc) sDec, err := base64.StdEncoding.DecodeString(sEnc) if err != nil { fmt.Printf("base64 decode failure, error=[%v]\n", err) } else { fmt.Printf("dec=[%s]\n", sDec) } return sEnc}// Use Stream load Import file data into Doris In the corresponding data table func batch_load_file(load StreamLoad, file string) { client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load" //fmt.Formatter(.Format(url,load.dbName,l)) fileContext, err := ioutil.ReadFile(file) if err != nil { log.Println("Failed to Read the File", file, err) } record := strings.NewReader(string(fileContext)) // Submit a request reqest, err := http.NewRequest(http.MethodPut, url, record) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) reqest.Header.Add("EXPECT", "100-continue") var u1 = uuid.Must(uuid.NewV4()) reqest.Header.Add("label", u1.String()) reqest.Header.Add("column_separator", ",") if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) responseBody := ResponseBody{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &responseBody) if err != nil { fmt.Println(err.Error()) } if responseBody.Status == "Success" { // If there is filtered data , Print wrong URL if responseBody.NumberFilteredRows > 0 { fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } else { fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows) } } fmt.Println(string(body)) } defer response.Body.Close()}// Memory stream data , adopt Stream Load Import Doris In the table func batch_load_data(load StreamLoad, data string) { client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load" //fmt.Formatter(.Format(url,load.dbName,l)) record := strings.NewReader(data) // Submit a request reqest, err := http.NewRequest(http.MethodPut, url, record) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) reqest.Header.Add("EXPECT", "100-continue") var u1 = uuid.Must(uuid.NewV4()) reqest.Header.Add("label", u1.String()) reqest.Header.Add("column_separator", ",") if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) responseBody := ResponseBody{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &responseBody) if err != nil { fmt.Println(err.Error()) } if responseBody.Status == "Success" { // If there is filtered data , Print wrong URL if responseBody.NumberFilteredRows > 0 { fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } else { fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows) } } else { fmt.Printf("Error Message : %s \n", responseBody.Message) fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } //fmt.Println(jsonStr) } defer response.Body.Close()}// obtain BE list func get_doris_be_list() *list.List { var load StreamLoad load.userName = "root" load.password = "" client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/backends?is_alive=true" // Submit a request reqest, err := http.NewRequest("GET", url, nil) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) bes := list.New() if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) backends := Backend{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &backends) if err != nil { fmt.Println(err.Error()) } for _, beinfo := range backends.Data.Backends { be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort) bes.PushBack(be) } } defer response.Body.Close() return bes}//Stream load Return the message structure type ResponseBody struct { TxnID int `json:"TxnId"` Label string `json:"Label"` Status string `json:"Status"` Message string `json:"Message"` NumberTotalRows int `json:"NumberTotalRows"` NumberLoadedRows int `json:"NumberLoadedRows"` NumberFilteredRows int `json:"NumberFilteredRows"` NumberUnselectedRows int `json:"NumberUnselectedRows"` LoadBytes int `json:"LoadBytes"` LoadTimeMs int `json:"LoadTimeMs"` BeginTxnTimeMs int `json:"BeginTxnTimeMs"` StreamLoadPutTimeMs int `json:"StreamLoadPutTimeMs"` ReadDataTimeMs int `json:"ReadDataTimeMs"` WriteDataTimeMs int `json:"WriteDataTimeMs"` CommitAndPublishTimeMs int `json:"CommitAndPublishTimeMs"` ErrorURL string `json:"ErrorURL"`}// obtain BE The list returns the structure type Backend struct { Msg string `json:"msg"` Code int `json:"code"` Data struct { Backends []struct { IP string `json:"ip"` HTTPPort int `json:"http_port"` IsAlive bool `json:"is_alive"` } `json:"backends"` } `json:"data"` Count int `json:"count"`}func main() { var load StreamLoad load.userName = "root" load.password = "" //auth_info := auth(load) //fmt.Println(auth_info) //backends := get_doris_be_list() //for e := backends.Front(); e != nil; e = e.Next() { // fmt.Println(e.Value) //} data := "10001, Zhang ***, Xi'an ,30,1,133****760, Shaanxi Province **********,2021-03-12 12:34:12" batch_load_data(load, data) //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")}
边栏推荐
- Three ways to solve your performance management problems
- Import and export using poi
- Resolve the error: org.apache.ibatis.binding.bindingexception
- JS written test question -- realize the flat function of array
- Use of stm32cubemonitor part I - data plotting and instrument display
- Merge sort / quick sort
- Keil compile download error: no algorithm found for: 08000000h - 08001233h solution
- List title of force buckle summary
- B. Almost Ternary Matrix
- JS foundation -- regular expression
猜你喜欢

kettle_ Configure database connection_ report errors

Stm32cubemx quadrature encoder

Use of stm32cubemonitor part I - data plotting and instrument display
![[Kali's sshd service is enabled]](/img/1b/180534d51049177254e30c4b783eba.png)
[Kali's sshd service is enabled]

Dynamic programming -- Digital DP

Riotboard development board series notes (4) -- using Vpu hardware decoding

Learning Record V

JS foundation -- math

Page performance: how to optimize pages systematically?

Dc-2-range practice
随机推荐
Wechat sports field reservation of applet completion works applet graduation design (8) graduation design thesis template
Download the jar package of jsqlparser and PageHelper
Riotboard development board series notes (6) -- buildreoot building system image
Wechat H5 record
ES6 - study notes
Leetcode programming practice -- Tencent selected 50 questions (I)
Wechat sports field reservation of the finished works of the applet graduation project (6) opening defense ppt
Selenium framework operation steelth.min.js file hides browser fingerprint features
mysql_ Account authorization permission recycling, account locking and unlocking, account creation and deletion
Stm32cubemx quadrature encoder
mysql_ Master slave synchronization_ Show slave status details
Daily three questions 7.16
Daily three questions 7.15
Unity refers to a variable in another class (its own instance)
mysql_ Case insensitive
Clothing ERP | ten advantages of clothing ERP for enterprises
Daily three questions 7.19
Dynamic programming -- Digital DP
B. Almost Ternary Matrix
Solve ''_ Xsrf 'argument missing from post