当前位置:网站首页>Flick batch reading es
Flick batch reading es
2022-07-23 07:51:00 【Wu Nian】
Flink Real time consumption kafka data , The data is processed , Enrichment 、 Cleaning and other operations , write in ES. In flow calculation , This scenario is very common .
In this paper ES Batch operation of BulkProcessor The way , This method uses TransportClient, be based on Tcp agreement ; and rest The method is restClient, be based on http agreement , The accuracy of the results cannot be guaranteed .
Reference resources
One 、 Depend on the preparation :
Mainly depends on :
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
The main function :
```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qianxin.ida.enrich.ElasticsearchSink;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
@Slf4j
public class KafkaToEs {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
try {
env.enableCheckpointing(10000);
//topic list
String[] topics = new String[]{"topic1", "topic2"};
// many topic situation
Arrays.stream(topics).forEach(topic -> {
SingleOutputStreamOperator<JSONObject> dateStream = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),
new Properties()).setStartFromLatest())
.map(new MapFunction<String, JSONObject>() {
// Specific data cleaning operations
public JSONObject map(String value) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject = JSON.parseObject(value);
return jsonObject;
}
});
dateStream.print();
// Customize sink
dateStream.addSink(new ElasticsearchSink());
});
env.execute("kafka2es");
} catch (Exception e) {
log.error("kafka2es fail " + e.getMessage());
}
}
}
ElasticsearchSink function :
package com.qianxin.ida.enrich;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import static com.qianxin.ida.utils.Constants.INDEX_SUFFIX;
@Slf4j
public class ElasticsearchSink extends RichSinkFunction<JSONObject> implements SinkFunction<JSONObject> {
private static BulkProcessor bulkProcessor = null;
@Override
public void open(Configuration parameters) throws Exception {
//BulkProcessor Is a thread safe batch processing class , Allows easy setup of Refresh A new batch request
Settings settings = Settings.builder()
.put("cluster.name", "elasticsearch")
.put("client.transport.sniff", false)
.build();
PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
TransportClient client = preBuiltTransportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
BulkProcessor.Listener listener = buildListener();
BulkProcessor.Builder bulk = BulkProcessor.builder(client, listener);
// Set the time to refresh the new batch request according to the currently added operand ( The default value is 1000,-1 Ban )
bulk.setBulkActions(Property.getIntValue("bulk_actions"));
// Set the time to refresh the new batch request according to the currently added operation size ( The default is 5Mb,-1 Ban )
bulk.setBulkSize(new ByteSizeValue(Property.getLongValue("bulk_size"), ByteSizeUnit.MB));
// Set the number of concurrent requests allowed to execute ( The default is 1,0 Only a single request is allowed )
bulk.setConcurrentRequests(Property.getIntValue("concurrent_request"));
// Set a refresh interval , If the interval passes , Refresh any pending batch requests ( The default is not set )
bulk.setFlushInterval(TimeValue.timeValueSeconds(Property.getLongValue("flush_interval")));
// Set a constant fallback strategy , Initial wait 1 Second , Again up 3 Time
bulk.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(Property.getLongValue("time_wait")),
Property.getIntValue("retry_times")));
bulkProcessor = bulk.build();
super.open(parameters);
}
private static BulkProcessor.Listener buildListener() throws InterruptedException {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
};
return listener;
}
@Override
public void invoke(JSONObject jsonObject, Context context) throws Exception {
try {
String topic = jsonObject.getString("topic");
String index = "index_";
bulkProcessor.add(new IndexRequest(index)
.type(topic)
.source(jsonObject));
} catch (Exception e) {
log.error("sink error {
{}}, The news is {
{}}", e.getMessage(), jsonObject);
}
}
}
边栏推荐
- 局域网SDN技术硬核内幕 9 从软件Overlay到硬件Overlay
- CPU/GPU(CUDA)版本的 YOLOv5后处理代码
- flink批量读取es
- 一次 MySQL 误操作导致的事故,「高可用」都顶不住了
- 局域网SDN硬核技术内幕 21 亢龙有悔——规格与限制(中)
- 亚马逊旗下Zoox通过安全测试 并在加州申请试驾
- RN底层原理 -- 1. Component和PureComponent解析
- squid代理服务+ip代理池
- With 130 new services and functions a year, this storage "family bucket" has grown again
- 百度搜索打击盗版网文站点:互联网内容侵权现象为何屡禁不止
猜你喜欢

Uniapp switches the tab bar to display different pages, remembers the page location and pulls up to get new data

With 130 new services and functions a year, this storage "family bucket" has grown again

The new idea 2022.2 was officially released, and the new features are really fragrant

Scala generic generic class details - t

Application of the latest version of Ontrack easyrecovery computer data recovery software

Clever use of curl

测试用例设计方法合集

驱动页面性能优化的3个有效策略
![[technology popularization] alliance chain layer2- on a new possibility](/img/e1/be9779eee3d3d4dcf56e103ba1d3d6.jpg)
[technology popularization] alliance chain layer2- on a new possibility
![[record of question brushing] 18. Sum of four numbers](/img/51/1be89efe609572a8b71f2f7c386711.png)
[record of question brushing] 18. Sum of four numbers
随机推荐
景联文科技提供3D点云-图像标注服务
用Stanford Parse(智能语言处理)去实现分词器
成功解决:error: src refspec master does not match any
LAN SDN technology hard core insider 6 distributed anycast gateway
Could NOT find Doxygen (missing: DOXYGEN_EXECUTABLE)
I use the factory mode in jd.com and explain the factory mode clearly
我为OpenHarmony 写代码,战“码”先锋第二期正式开启!
002_Kubernetes安装配置
我在京东使用工厂模式,一文说清楚工厂模式
ROS2常用命令行工具整理ROS2CLI
Wechat hotel reservation applet graduation project (5) assignment
Implementation of remove function
【09】程序装载:“640K内存”真的不够用么?
etcdv3·watch操作实现及相关重点说明
Patrick McHardy事件对开源社区的影响
squid代理服务+ip代理池
The new idea 2022.2 was officially released, and the new features are really fragrant
LAN SDN hard core technology insider 17 from one to 100
Leetcode 757 set the intersection size to at least 2[sort greedy] the leetcode path of heroding
LAN SDN technology hard core insider 8 from layer 2 switching to layer 3 routing