当前位置:网站首页>Customize flick es source
Customize flick es source
2022-07-23 07:51:00 【Wu Nian】
1、 demand
Incremental import elasticsearch Data to kafka.
2、 Solution
1) Customize a flume Of essource
2) Use spark Of es rdd
3) Customize flink Of es source
3、 solve the problem
1) Ideas :es There is one data in sendTime. That is to say, send it to es Time for . We will collect data incrementally according to this time . Use es Of
transport api. And use scorll api Pagination . So we use customization es source . First of all, we should inherit SourceFunction This class . stay run Method .
2) Be careful
What if our program hangs up . How do we know which time period we collected ?~~
I think so about this problem First of all, I am 5 Every minute . Then record the number of pieces collected every five minutes ,es Of index, Time period of collection . If the collection is successful, write it to mysql Record in the table . Failure will also result in record failure . Then if the exception collection fails . Then collect again . If the acquisition fails three times, the program will exit directly . Then check the reason and restart the program . Restart first mysql Read the position of the last acquisition . Then start to collect from the next record .
2) Code :es -source yes scala Code
-
package com.rongan.source
-
-
import java.util.Date
-
-
import com.rongan.commos.{DateUtils, EsUtils, PropertiesUtil}
-
import com.rongan.constants.Constants
-
import com.rongan.dao.EsExportRecordDAO
-
import com.rongan.model.EsExportRecord
-
import org.apache.flink.streaming.api.functions.source.SourceFunction
-
import org.elasticsearch.search.SearchHit
-
-
import scala.util.control.Breaks.{
break, breakable}
-
-
/**
-
* Customize es Data source
-
*
-
* @param clusterName : Cluster name
-
* @param esNode : Cluster nodes
-
* @param esPort :es Communication port
-
* @param index : Index name
-
* @param type1 :tpye
-
*/
-
class
EsSource(val clusterName: String, val esNode: String, val esPort: Int, val index: String, val type1: String,
var fromDate: String)
extends
SourceFunction[String] {
-
-
// Determine whether to cancel the operation
-
var
isRunning
=
true
-
//es The client of
-
EsUtils.getClient(clusterName, esNode, esPort)
-
-
val
properties
= PropertiesUtil.getProperties(Constants.PROPERTIES_PATH)
-
-
override def
run
(sourceContext: SourceFunction.SourceContext[String]): Unit = {
-
// Define a flag bit , Mark this is the first acquisition
-
var
flag
=
true;
-
// Create client
-
EsUtils.getClient(clusterName, esNode, esPort)
-
-
var
toDate
= fromDate
-
-
var
fromDate1
= fromDate
-
-
var
errorCount
=
0;
-
// Start collecting data
-
while (
true && isRunning) {
-
// Determine whether it is the first acquisition . establish lastUpdateTime Collection time of
-
if (flag) {
-
fromDate1 = toDate;
-
flag =
false
-
}
-
else
fromDate1
= DateUtils.targetFormat(DateUtils.add5Minute(DateUtils.strToDate(fromDate1)))
-
toDate = DateUtils.targetFormat(DateUtils.subtraction1second(DateUtils.add5Minute(DateUtils.strToDate(fromDate1))))
-
-
try {
-
var
startTime
= DateUtils.targetFormat(
new
Date())
-
println(
"start collection data index = " + index +
" send_time (start)= " + fromDate1 +
" send_time (end)= "
-
+ toDate +
" currentTime" + startTime)
-
val count: Int = collect(sourceContext, fromDate1, toDate)
-
-
var
endTime
= DateUtils.targetFormat(
new
Date())
-
-
EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, toDate, count, startTime, endTime,
1, index))
-
errorCount =
0
-
println(
"end of data collection index = " + index +
" send_time (start)= " + fromDate1 +
" send_time (end)= "
-
+ toDate +
" currentTime " + endTime +
" count data = " + count)
-
-
Thread.sleep(properties.getProperty(Constants.ES_COLLECT_INTERVAL).toLong)
-
-
}
catch {
-
case e: Exception => {
-
e.printStackTrace()
-
errorCount +=
1
-
println(
" Error collecting data index = " + index +
" send_time ( Start )= " + fromDate1 +
" send_time ( end ) ")
-
EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1,
"00000000",
0,
"00000000",
"00000000",
0, index))
-
fromDate1 = DateUtils.targetFormat(DateUtils.subtraction5Minute(DateUtils.strToDate(fromDate1)))
-
// If the acquisition fails three times, stop the program
-
if (errorCount >=
3) {
-
cancel()
-
}
-
}
-
}
-
}
-
-
}
-
-
// Collect data
-
def
collect
(sourceContext: SourceFunction.SourceContext[String], fromDate: String, toDate: String) = {
-
var
count
=
0;
-
val tuple: (Array[SearchHit], String) = EsUtils.searchByScrollRangeQuery(index, type1,
"send_time.keyword", fromDate, toDate)
-
count = tuple._1.length
-
for
(hit <- tuple._1) {
-
sourceContext.collect(hit.getSourceAsString)
-
}
-
var
scrollID
= tuple._2
-
// println(new Date().toString + " count= " + count)
-
breakable {
-
while (isRunning) {
-
val result: (Array[SearchHit], String) = EsUtils.searchByScrollId(scrollID)
-
if (result._1.length ==
0) {
-
break;
-
}
-
for (hit <- result._1) {
-
sourceContext.collect(hit.getSourceAsString)
-
}
-
count += result._1.
length
-
scrollID
= result._2
-
}
-
}
-
EsUtils.clearScroll(scrollID)
-
count
-
}
-
-
override def
cancel
(): Unit = {
-
isRunning =
false
-
}
-
-
}
-
-
//kafkatopic :roi-center.incident.detail.topic
-
-
object EsCollect {
-
-
}
-
4. Please leave a message for the whole project code ~. For the time being, we have achieved so much . If you have better ideas, you can discuss ~
esutil Code :
-
package rongan.
util
-
-
import org.
elasticsearch.
action.
search.{
ClearScrollResponse,
SearchRequestBuilder,
SearchResponse}
-
import org.
elasticsearch.
client.
transport.
TransportClient
-
import org.
elasticsearch.
common.
transport.
TransportAddress
-
import org.
elasticsearch.
common.
unit.
TimeValue
-
import org.
elasticsearch.
index.
query.
QueryBuilders
-
import org.
elasticsearch.
search.
SearchHit
-
import org.
elasticsearch.
search.
sort.
SortOrder
-
import rongan.
business.
tornado.
RsdTornadoIpcDeviceEsToHbase.
properties
-
import rongan.
config.
Constans
-
-
import scala.
util.
control.
Breaks.{
break, breakable}
-
-
object
EsUtils {
-
-
import java.
net.
InetAddress
-
-
import org.
elasticsearch.
common.
settings.
Settings
-
import org.
elasticsearch.
transport.
client.
PreBuiltTransportClient
-
-
// establish client
-
var
client:
TransportClient = _
-
-
def
getClient(
clusterName:
String,
host:
String,
port:
Int) = {
-
val
settings:
Settings =
Settings.
builder().
put(
"cluster.name", clusterName).
build
-
client =
new
PreBuiltTransportClient(settings)
-
.
addTransportAddress(
new
TransportAddress(
InetAddress.
getByName(host), port))
-
}
-
-
/**
-
* This method is used for range query
-
*
-
* @param index : Index name
-
* @param `type` :type Name
-
* @param field : According to which field range to query
-
* @param fromData : Initial data
-
* @param toData : End data
-
* @return scroollId
-
*/
-
def
searchByScrollRangeQuery(
index:
String,
`type`:
String,
field:
String,
fromData:
Any,
toData:
Any) = {
-
//1. Create search criteria
-
val
searchRequestBuilder:
SearchRequestBuilder = client.
prepareSearch()
-
searchRequestBuilder.
setIndices(index)
-
searchRequestBuilder.
setTypes(
`type`)
-
searchRequestBuilder.
setScroll(
new
TimeValue(
30000))
-
//2. Set query according to range
-
searchRequestBuilder.
setQuery(
QueryBuilders.
rangeQuery(field).
from(fromData).
to(toData)).
setSize(
10000)
-
searchRequestBuilder.
addSort(
"send_time.keyword",
SortOrder.
ASC)
-
//3. Execute the query
-
val
searchResponse:
SearchResponse = searchRequestBuilder.
get
-
//4 obtain scrollId
-
val
scrollId:
String = searchResponse.
getScrollId
-
//println("scrollID = " + scrollId)
-
// Compare the data on this page with scrollId return
-
val
searchHits:
Array[
SearchHit] = searchResponse.
getHits.
getHits
-
(searchHits, scrollId)
-
}
-
-
-
/**
-
* According to scrollId Query data , Query only one page of data
-
*
-
* @param scrollId1
-
* @return
-
*/
-
def
searchByScrollId(
scrollId1:
String): (
Array[
SearchHit],
String) = {
-
if (scrollId1 ==
null) {
-
return (
Array[
SearchHit](),
null);
-
}
-
// println(scrollId1)
-
// result
-
val searchScrollRequestBuilder = client.
prepareSearchScroll(scrollId1)
-
// Reset the scrolling time
-
searchScrollRequestBuilder.
setScroll(
new
TimeValue(
30000))
-
// request
-
val response = searchScrollRequestBuilder.
get
-
// Return the next batch result each time Stop until no result returns namely hits Array empty
-
//if (response.getHits.getHits.length == 0) break
-
(response.
getHits.
getHits, response.
getScrollId)
-
}
-
-
/**
-
* eliminate scrollID
-
*
-
* @param scrollId
-
*/
-
def
clearScroll(
scrollId: String) {
-
if (scrollId ==
null)
return
-
var clearScrollRequestBuilder = client.
prepareClearScroll
-
clearScrollRequestBuilder.
addScrollId(scrollId)
-
val
response:
ClearScrollResponse = clearScrollRequestBuilder.
get
-
response.
isSucceeded
-
}
-
-
def
main(
args:
Array[
String]):
Unit = {
-
// searchByScrollPrefixQuery("a", "b", "c", "d")
-
// Left closure Right closure . If it's the next five minutes . The final number of seconds should go back one digit
-
-
EsUtils.
getClient(properties.
getProperty(
Constans.
ES_CLUSTER_NAME), properties.
getProperty(
Constans.
ES_NODE),
-
properties.
getProperty(
Constans.
ES_PORT).
toInt)
-
var count =
0;
-
val
tuple: (
Array[
SearchHit],
String) =
searchByScrollRangeQuery(
"firewall.ipc.info*",
-
"alert",
"send_time.keyword",
"2019-01-28 19:15:20",
"2019-09-28 19:15:2")
-
count = tuple.
_1.
length
-
var scrollID = tuple.
_2
-
println(count)
-
for (hit <- tuple.
_1) {
-
println(hit.
getSourceAsString)
-
}
-
// EsUtils.getClient("")
-
breakable {
-
while (
true) {
-
val
result: (
Array[
SearchHit],
String) =
searchByScrollId(scrollID)
-
count += result.
_1.
length
-
for (hit <- result.
_1) {
-
println(hit.
getSourceAsString)
-
}
-
if (result.
_1.
length ==
0) {
-
break;
-
}
-
scrollID = result.
_2
-
}
-
println(count)
-
}
-
clearScroll(scrollID)
-
}
-
-
}
边栏推荐
- 局域网SDN硬核技术内幕 21 亢龙有悔——规格与限制(中)
- ROS based navigation framework
- Scala generic generic class details - t
- redis 哨兵模式,主节点检查脚本
- Codeforces Round #809 (Div. 2)(C和D1两题)
- 局域网SDN技术硬核内幕 6 分布式任意播网关
- Wechat hotel reservation applet graduation project (8) graduation project thesis template
- Wechat campus second-hand book trading applet graduation design finished product (7) Interim inspection report
- zabbix agent创建监控项
- ETL tool (data synchronization)
猜你喜欢
随机推荐
How to use the order flow analysis tool (in)
局域网SDN技术硬核内幕 10 云网融合的红娘EVPN
Scala Generic 泛型类详解 - T
测试用例设计方法合集
如何保护 JDBC 应用程序免受 SQL 注入
File upload, server file name Chinese garbled file upload, server file name Chinese garbled
Could NOT find Doxygen (missing: DOXYGEN_EXECUTABLE)
模拟Not All Endpoints Registered异常及解决方案
Classes and objects (1)
Summary in the development process BaseService provides a public access service file for all controllers or services to reduce repeated injection
Wechat hotel reservation applet graduation project (8) graduation project thesis template
【无标题】
BGP笔记(二)
11.37万的星瑞是怎样一个产品和表现力?一起来看看吧
我为OpenHarmony 写代码,战“码”先锋第二期正式开启!
如何在 PHP 应用程序中预防SQL 注入
Mysql A left(right) join B on A.id=B.id and A.age=1与A left(right) join B on A.id=B.id where A.age=1
About redis, do you update the database first or the cache first?
(五)数电——公式化简法
Wechat campus second-hand book trading applet graduation design finished product (4) opening report










