当前位置:网站首页>Flink advanced features and new features (VIII)
Flink advanced features and new features (VIII)
2022-07-24 13:45:00 【Hua Weiyun】
Flink Advanced features and new features ( 8、 ... and )

BroadcastState State management
- broadcast state Broadcast variable status

Application scenarios
Association update rules , Get the specified data ( to ip Get longitude and latitude )=> Map API Get Street location in provincial and urban areas
demand
real time Flink DataStream Filtering out configuration ( database ) Users of , And complete the basic information of these users in the event flow .
Demand process

- Development steps
package cn.itcast.flink.broadcast;import org.apache.flink.api.common.state.BroadcastState;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.common.state.ReadOnlyBroadcastState;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple4;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.BroadcastStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.util.Collector;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Random;/** * Author itcast * Date 2021/6/24 8:29 * Two data streams 1. Flow of events 2. User configuration flow 3.connect Associated operations 4. Printout 5. Perform tasks * <String,String,String,Integer></> * {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1} * {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1} * <String, String, Integer ></> * 'user_2', ' Li Si ', 20 * The final data flow 6 individual Tuple6<String,String,String,Integer,String,Integer></> * (user_3,2019-08-17 12:19:47,browse,1, Wang Wu ,33) * (user_2,2019-08-17 12:19:48,click,1, Li Si ,20) */public class BroadcastStateDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set parallelism env.setParallelism(1); //2.source //-1. Build real-time data event flow - Custom random //<userID, eventTime, eventType, productID> DataStreamSource<Tuple4<String, String, String, Integer>> clickSource = env.addSource(new MySource()); //-2. Build configuration flow - from MySQL //< user id,< full name , Age >> DataStreamSource<Map<String, Tuple2<String, Integer>>> configSource = env.addSource(new MySQLSource()); //3.transformation //-1. Define the state descriptor //MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = //new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT))); MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> broadcastDesc = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT))); //-2. Broadcast configuration flow //BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor); BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configSource.broadcast(broadcastDesc); //-3. Connect the event stream to the broadcast stream //BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS); SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = clickSource.connect(broadcastDS) //-4. Process the connected stream - Complete the user information in the event flow according to the configuration flow .process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>() { @Override public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception { // Read out f0 by userId // Read users in the event stream userId String userId = value.f0; // from ctx Environment variables through desc Read out the broadcast status ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc); // If the broadcast status is not empty ,get(null) Get out The configuration data Tuple2 if (broadcastState != null) { Map<String, Tuple2<String, Integer>> map = broadcastState.get(null); // Judge map If it is not empty if (map != null) { Tuple2<String, Integer> stringIntegerTuple2 = map.get(userId); // Take out the name and age //collect collect Tuple6 //3-4. Handle (process) Flow after connection - Complete the user information in the event flow according to the configuration flow ,Tuple4 and Tuple2 Merge // Handle every element ,processElement out.collect(Tuple6.of( userId, value.f1, value.f2, value.f3, stringIntegerTuple2.f0, stringIntegerTuple2.f1 )); } } } //value Namely MySQLSource The latest information obtained at regular intervals in map data // First, obtain the historical broadcast status according to the status descriptor ctx.getBroadcastState(desc) @Override public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception { // Then clear the historical status broadcastState data BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc); // Finally, put the latest broadcast stream data into state in ( Update status data ) broadcastState.put(null,value) broadcastState.clear(); broadcastState.put(null, value); } }); // Deal with elements in the broadcast //4.sinks result.print(); //5.execute env.execute(); } /** * <userID, eventTime, eventType, productID> */ public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> { private boolean isRunning = true; @Override public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception { Random random = new Random(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); while (isRunning){ int id = random.nextInt(4) + 1; String user_id = "user_" + id; String eventTime = df.format(new Date()); String eventType = "type_" + random.nextInt(3); int productId = random.nextInt(4); ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId)); Thread.sleep(500); } } @Override public void cancel() { isRunning = false; } } /** * < user id,< full name , Age >> */ public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> { private boolean flag = true; private Connection conn = null; private PreparedStatement ps = null; private ResultSet rs = null; @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://node3:3306/bigdata?useSSL=false", "root", "123456"); String sql = "select `userID`, `userName`, `userAge` from `user_info`"; ps = conn.prepareStatement(sql); } @Override public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception { while (flag){ Map<String, Tuple2<String, Integer>> map = new HashMap<>(); ResultSet rs = ps.executeQuery(); while (rs.next()){ String userID = rs.getString("userID"); String userName = rs.getString("userName"); int userAge = rs.getInt("userAge"); //Map<String, Tuple2<String, Integer>> map.put(userID, Tuple2.of(userName,userAge)); } ctx.collect(map); Thread.sleep(5000);// every other 5s Update the user's configuration information ! } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { if (conn != null) conn.close(); if (ps != null) ps.close(); if (rs != null) rs.close(); } }}- Real time data flow and Configuration flow in a dynamically changing database Conduct connect operation , Printout
边栏推荐
- R语言epiDisplay包的kap函数计算Kappa统计量的值(总一致性、期望一致性)、对多个评分对象的结果进行一致性分析、评分的类别为多个类别、如果评分中包含缺失值则标准误及其相关统计量则无法计算
- Adjust the array order so that odd numbers precede even numbers
- 脑注意力机制启发的群体智能协同避障方法
- 一些简单命令
- 基于典型相关分析的多视图学习方法综述
- Some simple commands
- Interface document evolution atlas, some ancient interface document tools, you may not have used them
- Network security - filtering bypass injection
- Kunyu(坤舆) 安装 详解
- CSP2021 T3 回文
猜你喜欢
随机推荐
Simulate the implementation of the library function memcpy-- copy memory blocks. Understand memory overlap and accurate replication in detail
在LNMP架构中搭建Zabbix监控服务
Flink综合案例(九)
R language test sample proportion: use the prop.test function to perform a single sample proportion test to calculate the confidence interval of the p value of the successful sample proportion in the
交换机链路聚合详解【华为eNSP】
Flink容错机制(五)
网络安全——Web信息收集
使用Activiti创建数据库表报错,
Adjust the array order so that odd numbers precede even numbers
Chapter VI bus
Rhcsa sixth note
Aggregation measurement of robot swarm intelligence based on group entropy
网络安全——文件上传黑名单绕过
Dtcloud uses custom fonts
关于不定方程解的个数的问题
Network security - filtering bypass injection
如何生成预期数据?埃默里大学等最新《深度学习可控数据生成》综述,52页pdf涵盖346篇文献全面阐述可控生成技术体系
Paper notes: swing UNET: UNET like pure transformer for medicalimage segmentation
JQ remove an element style
Sringboot-plugin-framework 实现可插拔插件服务








