当前位置:网站首页>2022-06-21-flink-49 (I. SQL manual)
2022-06-21-flink-49 (I. SQL manual)
2022-06-25 03:56:00 【Game programming】
1. Dynamic table
SQL | Flow calculation |
---|---|
A table is a bounded data set , During the calculation , Once confirmed, it will not change | A stream is an unbounded data set , Continuous data generation , During the calculation , Data sets are constantly changing |
The query results executed on the batch data can access the complete input data | Streaming queries cannot access all data at startup , Must wait for data to flow in |
The calculation of the bounded data set will eventually end , Get the calculation results | Streaming will continue to receive data , Constantly update its calculation results , It won't end |
Tradition SQL More mature , And widely used , If it can be used on the stream SQL Development , It will bring great convenience , take SQL Apply to flow calculation , It is necessary to fuse the two concepts of flow and table . If you treat a bounded data set as a table , An unbounded data set is a table that writes data continuously over time .
2. ⼀ individual SQL\Table API The code structure of the task
public class tableDemo1 { public static void main(String[] args) { EnvironmentSettings setting = EnvironmentSettings.newInstance() .inStreamingMode() // Declare the streaming task // .inBatchMode() // Declare batch tasks .build(); TableEnvironment tableEnv = TableEnvironment.create(setting); // Create an input table tableEnv.executeSql("CREATE TABLE input_table(user_name STRING ,url STRING, ts BIGINT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/c.txt', 'format' = 'csv' )"); //// Create an output table tableEnv.executeSql("CREATE TABLE output_table(user_name STRING ,url STRING) WITH ( 'connector' = 'filesystem', 'path' = 'output/2022-06-21', 'format' = 'csv' )"); // Use TABLE-API Table table_API = tableEnv.from("input_table").where($("user_name").isEqual(" millet ")).select($("user_name"), $("url")); // Use SQL Table table_SQL = tableEnv.sqlQuery("SELECT user_name, url FROM input_table WHERE user_name = ' millet ' "); // Write data to the output table table_API.executeInsert("output_table"); table_SQL.executeInsert("output_table"); }}
SQL Context :TableEnvironment
- In the internal catalog Registry in ,catalog It can be understood as flink Of metastore, similar Hive Medium metastore Yes Hive The status of
Register outside catalog
perform SQL Inquire about
Register user definitions ( Scalar , Watch or aggregate ) function
take DataStream or DataSet( abandoned ) Convert to table
Hold the right ExecutionEnvironment or StreamExecutionEnvironment References to
SQL Table concept in
The full name of a table ( identification ) It will consist of three parts :catalog name . Database name . The name of the table , If catalog Name or database name is not specified , The current default value will be used default, The following SQL Created table My full name is default.nono.output_table
tableEnv.executeSql("CREATE TABLE nono.output_table(user_name STRING ,url STRING) WITH ( 'connector' = 'filesystem', 'path' = 'output/2022-06-21', 'format' = 'csv' )");
Tables can be regular external tables , It can also be a virtual view
- External table : It describes external data , For example, file , Message queuing, etc
View view: Create from an existing table , The view is generally a SQL Logical query results , Compared to offline hive SQL in , In an offline scenario view They are also created from existing tables
One SQL Query case
Chat flink 1.11 Random data generator in -DataGen connector
public class tableDemo2 { public static void main(String[] args) { EnvironmentSettings setting = EnvironmentSettings.newInstance() .inStreamingMode() // Declare the streaming task // .inBatchMode() // Declare batch tasks .build(); TableEnvironment tableEnv = TableEnvironment.create(setting); // Create random mock data source tableEnv.executeSql("CREATE TABLE sku_price_table( tag STRING ,sku_id INT ,price INT,ts AS localtimestamp, WATERMARK FOR ts AS ts ) " + " WITH( 'connector' = 'datagen', " + " 'rows-per-second'='1', " + " 'fields.sku_id.kind'='sequence', " + " 'fields.sku_id.start'='1'," + " 'fields.sku_id.end'='1000'," + " 'fields.tag.length'='1'," + " 'fields.price.min'='1'," + " 'fields.price.max'='1000')"); // Create to kafka Input table for tableEnv.executeSql("CREATE TABLE sink_tag_agg_Table (" + " tag STRING, " + " count_tag BIGINT, " + " sum_tag BIGINT , " + " max_tag BIGINT, " + " PRIMARY KEY (`tag`) NOT ENFORCED" + ") WITH (" + " 'connector' = 'upsert-kafka', " + " 'topic' = 'topic05', " + " 'properties.bootstrap.servers' = '43.142.80.86:9093', " + " 'key.format' = 'json' ," + " 'value.format' = 'json'" + ")"); tableEnv.executeSql("INSERT INTO sink_tag_agg_Table SELECT tag, count(1) as count_tag, sum(price) as sum_tag,max(price) as max_tag FROM sku_price_table GROUP BY tag"); }}
bug:Could not find any factory for identifier 'json' that implements
Lack of dependence , Join in json rely on
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
SQL And DataStream API Transformation
SQL Functions that can't be realized DataStream API To realize , Implement one as follows log Alarm function
public class tableDemo3 { public static void main(String[] args) throws Exception { LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Create random mock data source tableEnv.executeSql("CREATE TABLE tag_money_table( tag STRING ,id INT ,money INT,ts AS CAST(CURRENT_TIMESTAMP AS timestamp_LTZ(3)), WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) " + " WITH( 'connector' = 'datagen', " + " 'rows-per-second'='1', " + " 'fields.id.kind'='sequence', " + " 'fields.id.start'='1'," + " 'fields.id.end'='1000'," + " 'fields.tag.length'='1'," + " 'fields.money.min'='100'," + " 'fields.money.max'='10000')"); // Using Windows TVF Realize the cumulative window Table sqlQuery = tableEnv.sqlQuery("SELECT UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 ,window_end , COUNT(DISTINCT id ) ,SUM(money) as money FROM TABLE(CUMULATE(TABLE tag_money_table ,DESCRIPTOR(ts) ,INTERVAL '5' SECOND ,INTERVAL '30' SECOND )) GROUP BY window_start ,window_end "); // establish Logger object Logger logger = LoggerFactory.getLogger(tableDemo3.class); // Convert table to stream tableEnv.toDataStream(sqlQuery,Row.class).flatMap(new FlatMapFunction<Row, String>() { @Override public void flatMap(Row value, Collector<String> out) throws Exception { long l = Long.parseLong(String.valueOf(value.getField("money"))); if ( l >= 10000L){ logger.info("...... The alarm ....."); logger.info("-------"); } out.collect(value.toString()); } }).print(); env.execute(); }}
You want to configure log4j.properties, Put this folder in src/main/resources Next
### Set up ###log4j.rootLogger = debug,stdout,D,E### Output information to control lifting ###log4j.appender.stdout = org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target = System.outlog4j.appender.stdout.layout = org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### Output DEBUG Log above level to =/home/hao/Desktop/error.log ###log4j.appender.D = org.apache.log4j.DailyRollingFileAppenderlog4j.appender.D.File = /home/hao/Desktop/error.loglog4j.appender.D.Append = truelog4j.appender.D.Threshold = DEBUG log4j.appender.D.layout = org.apache.log4j.PatternLayoutlog4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n### Output ERROR Log above level to =/home/hao/Desktop/error.log ###log4j.appender.E = org.apache.log4j.DailyRollingFileAppenderlog4j.appender.E.File =/home/hao/Desktop/error.loglog4j.appender.E.Append = truelog4j.appender.E.Threshold = ERROR log4j.appender.E.layout = org.apache.log4j.PatternLayoutlog4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
date . Time type
- DATE: By year - month - Japan Date function type without time zone meaning , Value range [0000-01-01 , 9999-12-31]
TIME,TIME(p): Time data type without time zone , from hour:minute:second[.fractional] form , Accuracy reaches nanoseconds , Range from 00:00:00.000000000 To 23:59:59.999999999. And SQL Compared to the standard , I won't support it leap seconds(23:59:60 and 23:59:61), Semantically closer to java.time.LocalTime. No time with time zone provided . among p Is the number of decimal places of the second ( precision ).p The value of must be between 0 and 9 Between ( Including boundary value ). If no precision is specified , be p be equal to 0
TIMESTAMP,TIMESTAMP(p): Timestamp data type without time zone , from year-month-day hour:minute:second[.fractional] form , Accuracy reaches nanoseconds , Range from 0000-01-01 00:00:00.000000000 To 9999-12-31 23:59:59.999999999. And SQL Compared to the standard , I won't support it leap seconds(23:59:60 and 23:59:61), Semantically closer to java.time.LocalDateTime among p Is the number of decimal places of the second ( precision ).p The value of must be between 0 and 9 Between ( Including boundary value ). If no precision is specified , be p be equal to 6.TIMESTAMP(p) WITHOUT TIME ZONE Equivalent to this type
TIMESTAMP WITH TIME ZONE,TIMESTAMP(p) WITH TIME ZONE: Timestamp data type with time zone , from year-month-day hour:minute:second[.fractional] zone form , Accuracy reaches nanoseconds , Range from 0000-01-01 00:00:00.000000000 +14:59 To 9999-12-31 23:59:59.999999999 -14:59 among p Is the number of decimal places of the second ( precision ).p The value of must be between 0 and 9 Between ( Including boundary value ). If no precision is specified , be p be equal to 6
TIMESTAMP_LTZ、TIMESTAMP_LTZ(p): from year - month - Japan ⼩ when : minute : second [.⼩ Seconds ] The time zone Composed of With time zone meaning The type of time , Value range [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]. among p The number of decimal seconds , The value range is [0,9], If you don't specify p, The default is 6
INTERVAL YEAR TO MONTH: One group is made up of Year-Month Interval The data type of the composition , It ranges from -9999-11 To +9999-11, Can express :
INTERVAL YEARINTERVAL YEAR(p)INTERVAL YEAR(p) TO MONTHINTERVAL MONTH
among p Is the number of years ( Annual precision ) Number of digits .p The value of must be between 1 and 4 Between ( Including boundary value ). If annual precision is not specified ,p It is equal to 2.
Interval literal | explain |
---|---|
INTERVAL '3' YEAR | The time interval is 3 year |
INTERVAL '3' MONTH | The time interval is 3 Months |
INTERVAL '3-4' YEAR TO MONTH | The time interval is 3 year 4 Months |
- INTERVAL DAY TO SECOND: One group is made up of Day-Time Interval The data type of the composition . The time interval is determined by +days hours:minutes:seconds.fractional form , It ranges from -999999 23:59:59.999999999 To +999999 23:59:59.999999999, Can express :
INTERVAL DAYINTERVAL DAY(p1)INTERVAL DAY(p1) TO HOURINTERVAL DAY(p1) TO MINUTEINTERVAL DAY(p1) TO SECOND(p2)INTERVAL HOURINTERVAL HOUR TO MINUTEINTERVAL HOUR TO SECOND(p2)INTERVAL MINUTEINTERVAL MINUTE TO SECOND(p2)INTERVAL SECONDINTERVAL SECOND(p2)
among p1 It's days ( Day accuracy ) Number of digits ,p2 Is the number of decimal places of the second ( Decimal precision ).p1 The value of must be between 1 Between and 6( Including boundary value ),p2 The value of must be between 0 Between and 9( Including boundary value ). If p1 No value specified , Is equal to by default 2, If p2 No value specified , Is equal to by default 6.
Interval literal | explain |
---|---|
INTERVAL '3' DAY | The time interval is 3 God |
INTERVAL '2' HOUR | The time interval is 2 Hours |
INTERVAL '25' MINUTE | The time interval is 25 minute |
INTERVAL '45' SECOND | The time interval is 45 second |
INTERVAL '3 02' DAY TO HOUR | The time interval is 3 Tianzero 2 Hours |
INTERVAL '3 02:25' DAY TO MINUTE | The time interval is 3 Tianzero 2 Hours 25 branch |
INTERVAL '3 02:25:45' DAY TO SECOND | The time interval is 3 Tianzero 2 Hours 25 branch 45 second |
INTERVAL '02:25' HOUR TO MINUTE | The time interval is 2 Hours 25 branch |
INTERVAL '02:25:45' HOUR TO SECOND | The time interval is 2 Hours 25 branch 45 second |
INTERVAL '25:45' MINUTE TO SECOND | The time interval is 25 branch 45 second |
3. Convert dynamic output table to output data
Of tables in the database DML Behavior has 3 Kind of : INSERT、UPDATE、DELETE. The concept of a dynamic table as a table similar to a database , Also support this 3 Kind of DML operation , But it's a little different
stay Flink in , Divide the dynamic table into 3 Types :
1) Only update behavior , A table that has only one or more rows but is continuously updated .
2) Only the insertion behavior , No, UPDATE、DELETE Changed insert table only .
3) Tables with both insert and update behavior .
When a dynamic table is converted to a stream or written to an external system , Changes to dynamic tables ( modify ) Need to be translated into behavior on the stream , this 3 Types of dynamic tables , Corresponding to different types of data flow .Flink Of Table API & SQL Support 3 Changes on dynamic tables in three ways ( modify ).Flink Of Table API & SQL Support 3 Changes on dynamic tables in three ways ( modify ).
(1). Append flow
Append Stream only supports append write behavior , Only support INSERT Dynamic table of behavior , I won't support it Update、Delete Change the behavior of existing data .
(2). Retract flow
Retract The stream contains two types of messages :add News and retract news . The message types corresponding to the change behavior of the dynamic table are as follows .
1)INSERT Change to on stream add news .
2)DELETE Change to on stream retract news .
3)UPDATE The change is converted to two messages , That is, for the old records retract Messages and new records add news .

(3). Upsert flow
Upsert The stream contains two types of messages :update News and delete news . The dynamic table is converted to Upsert The stream must have a primary key ( It can be a composite primary key ), The message types corresponding to the change behavior of dynamic tables with primary keys are as follows .
1)INSERT、UPDATE Convert to UPSERT news .
2)DELETE Convert to delete news .

4. SQL There are two ways to specify the time attribute in
If you want to be full ⾜ Flink SQL Time window ⼝ Class ,SQL or Table API Medium Data source table You need to provide the time attribute ( It is equivalent to that we put this time attribute in Data source table On ⾯ Into the ⾏ Statement ), as well as ⽀ Hold time related operations .FlinkSQL Two ways to specify timestamp are provided for us
- CREATE TABLE DDL When creating a table, specify
Can be in DataStream It is specified in , In the subsequent DataStream turn Table Use in
Event Time
CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time TIMESTAMP(3), -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND) WITH ( ...);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);CREATE TABLE user_actions ( user_name STRING, data STRING, ts BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(ts, 3), -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND) WITH ( ...);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
// Option 1:// extract timestamp and assign watermarks based on knowledge of the streamDataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// declare an additional logical field as an event time attributeTable table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());// Option 2:// extract timestamp from first field, and assign watermarks based on knowledge of the streamDataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// the first field has been used for timestamp extraction, and is no longer necessary// replace first field with a logical event time attributeTable table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));// Usage:WindowedTable windowedTable = table.window(Tumble .over(lit(10).minutes()) .on($("user_action_time")) .as("userActionWindow"));
Processing Time
CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute) WITH ( ...);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
DataStream<Tuple2<String, String>> stream = ...;// declare an additional logical field as a processing time attributeTable table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());WindowedTable windowedTable = table.window( Tumble.over(lit(10).minutes()) .on($("user_action_time")) .as("userActionWindow"));
Flink Table & SQL Time attribute
Time Attributes | Apache Flink
author : Vaguely like Mengmeng
Game programming , A game development favorite ~
If the picture is not displayed for a long time , Please use Chrome Kernel browser .
边栏推荐
- Sleep more, you can lose weight. According to the latest research from the University of Chicago, sleeping more than 1 hour a day is equivalent to eating less than one fried chicken leg
- Winxp kernel driver debugging
- 威马招股书拆解:电动竞争已结束,智能排位赛刚开始
- AI越进化越跟人类大脑像!Meta找到了机器的“前额叶皮层”,AI学者和神经科学家都惊了...
- 【Harmony OS】【ArkUI】ets开发 图形与动画绘制
- Jilin University 22 spring March document retrieval assignment assessment-00073
- Standing at the center of the storm: how to change the engine of Tencent
- 居家办公之后才明白的时间管理 | 社区征文
- 腾讯开源项目「应龙」成Apache顶级项目:前身长期服务微信支付,能hold住百万亿级数据流处理...
- MySQL根据表前缀批量修改、删除表
猜你喜欢
DevEco Studio 3.0编辑器配置技巧篇
腾讯开源项目「应龙」成Apache顶级项目:前身长期服务微信支付,能hold住百万亿级数据流处理...
Tencent Open Source Project "Yinglong" est devenu un projet Apache de haut niveau: l'ancien Service à long terme Wechat payment, peut maintenir un million de milliards de niveaux de traitement de flux
opencv最大能打开多少图像?
俄罗斯AIRI研究院等 | SEMA:利用深度迁移学习进行抗原B细胞构象表征预测
JSP cannot be resolved to a type error reporting solution
你真的需要自动化测试吗?
Nacos practice record
谷歌创始人布林二婚破裂:被曝1月已提出与华裔妻子离婚,目前身家6314亿美元...
用CPU方案打破内存墙?学PayPal堆傲腾扩容量,漏查欺诈交易量可降至1/30
随机推荐
The programmer reality show is coming again! Hulan, as the host, carried the lamp to fill the knowledge. The SSS boss had a bachelor's degree in pharmacy. Zhu Jun and Zhang Min from Tsinghua joined th
JS tool function, self encapsulating a throttling function
Work assessment of Biopharmaceutics of Jilin University in March of the 22nd spring -00005
redis
Is it safe to open an account with flush securities?
Does it count as staying up late to sleep at 2:00 and get up at 10:00? Unless you can do it every day
吴恩达机器学习新课程又来了!旁听免费,小白友好
opencv最大能打开多少图像?
現在,耳朵也要進入元宇宙了
Peking University has a new president! Gongqihuang, academician of the Chinese Academy of Sciences, took over and was admitted to the Physics Department of Peking University at the age of 15
Wuenda, the new course of machine learning is coming again! Free auditing, Xiaobai friendly
騰訊開源項目「應龍」成Apache頂級項目:前身長期服務微信支付,能hold住百萬億級數據流處理...
程序员真人秀又来了!呼兰当主持挑灯狂补知识,SSS大佬本科竟是药学,清华朱军张敏等加入导师团...
Jilin University 22 spring March "technical economics" assignment assessment-00073
Randla net: efficient semantic segmentation of large scale point clouds
Install ffmpeg in LNMP environment and use it in yii2
扎克伯格最新VR原型机来了,要让人混淆虚拟与现实的那种
【Harmony OS】【ARK UI】ETS 上下文基本操作
X86 CPU, critical! The latest vulnerability has caused heated discussion. Hackers can remotely steal keys. Intel "all processors" are affected
Break the memory wall with CPU scheme? Learn from PayPal stack to expand capacity, and the volume of missed fraud transactions can be reduced to 1/30