当前位置:网站首页>2022-06-21-flink-49 (I. SQL manual)

2022-06-21-flink-49 (I. SQL manual)

2022-06-25 03:56:00 Game programming

1. Dynamic table

SQL Flow calculation
A table is a bounded data set , During the calculation , Once confirmed, it will not change A stream is an unbounded data set , Continuous data generation , During the calculation , Data sets are constantly changing
The query results executed on the batch data can access the complete input data Streaming queries cannot access all data at startup , Must wait for data to flow in
The calculation of the bounded data set will eventually end , Get the calculation results Streaming will continue to receive data , Constantly update its calculation results , It won't end

Tradition SQL More mature , And widely used , If it can be used on the stream SQL Development , It will bring great convenience , take SQL Apply to flow calculation , It is necessary to fuse the two concepts of flow and table . If you treat a bounded data set as a table , An unbounded data set is a table that writes data continuously over time .

2. ⼀ individual SQL\Table API The code structure of the task

public class tableDemo1 {    public static void main(String[] args) {        EnvironmentSettings setting  = EnvironmentSettings.newInstance()                .inStreamingMode() // Declare the streaming task                 // .inBatchMode() //  Declare batch tasks                 .build();        TableEnvironment tableEnv = TableEnvironment.create(setting);        // Create an input table         tableEnv.executeSql("CREATE TABLE input_table(user_name STRING ,url STRING, ts BIGINT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/c.txt', 'format' = 'csv' )");        //// Create an output table         tableEnv.executeSql("CREATE TABLE output_table(user_name STRING ,url STRING) WITH ( 'connector' = 'filesystem', 'path' = 'output/2022-06-21', 'format' = 'csv' )");        // Use TABLE-API        Table table_API = tableEnv.from("input_table").where($("user_name").isEqual(" millet ")).select($("user_name"), $("url"));        // Use SQL        Table table_SQL = tableEnv.sqlQuery("SELECT user_name, url FROM input_table WHERE user_name = ' millet ' ");        // Write data to the output table         table_API.executeInsert("output_table");        table_SQL.executeInsert("output_table");    }}
SQL Context :TableEnvironment
  1. In the internal catalog Registry in ,catalog It can be understood as flink Of metastore, similar Hive Medium metastore Yes Hive The status of

  2. Register outside catalog

  3. perform SQL Inquire about

  4. Register user definitions ( Scalar , Watch or aggregate ) function

  5. take DataStream or DataSet( abandoned ) Convert to table

  6. Hold the right ExecutionEnvironment or StreamExecutionEnvironment References to

SQL Table concept in

The full name of a table ( identification ) It will consist of three parts :catalog name . Database name . The name of the table , If catalog Name or database name is not specified , The current default value will be used default, The following SQL Created table My full name is default.nono.output_table

tableEnv.executeSql("CREATE TABLE nono.output_table(user_name STRING ,url STRING) WITH ( 'connector' = 'filesystem', 'path' = 'output/2022-06-21', 'format' = 'csv' )");

Tables can be regular external tables , It can also be a virtual view

  1. External table : It describes external data , For example, file , Message queuing, etc

  2. View view: Create from an existing table , The view is generally a SQL Logical query results , Compared to offline hive SQL in , In an offline scenario view They are also created from existing tables

One SQL Query case

Chat flink 1.11 Random data generator in -DataGen connector

public class tableDemo2 {    public static void main(String[] args) {        EnvironmentSettings setting  = EnvironmentSettings.newInstance()                .inStreamingMode() // Declare the streaming task                 // .inBatchMode() //  Declare batch tasks                 .build();        TableEnvironment tableEnv = TableEnvironment.create(setting);        // Create random mock data source         tableEnv.executeSql("CREATE TABLE sku_price_table( tag STRING ,sku_id INT ,price INT,ts AS localtimestamp, WATERMARK FOR ts AS ts ) " +                                     " WITH( 'connector' = 'datagen',  " +                                             " 'rows-per-second'='1',  " +                                             " 'fields.sku_id.kind'='sequence', " +                                             " 'fields.sku_id.start'='1'," +                                             " 'fields.sku_id.end'='1000'," +                                             " 'fields.tag.length'='1'," +                                             " 'fields.price.min'='1'," +                                             " 'fields.price.max'='1000')");        // Create to kafka Input table for         tableEnv.executeSql("CREATE TABLE sink_tag_agg_Table (" +                " tag STRING, " +                " count_tag BIGINT, " +                " sum_tag BIGINT , " +                " max_tag BIGINT, " +                " PRIMARY KEY (`tag`) NOT ENFORCED" +                ") WITH (" +                " 'connector' = 'upsert-kafka', " +                " 'topic' = 'topic05', " +                " 'properties.bootstrap.servers' = '43.142.80.86:9093', " +                " 'key.format' = 'json' ," +                " 'value.format' = 'json'" +                ")");    tableEnv.executeSql("INSERT INTO  sink_tag_agg_Table SELECT tag, count(1) as count_tag, sum(price) as sum_tag,max(price) as max_tag FROM sku_price_table GROUP BY tag");    }}

bug:Could not find any factory for identifier 'json' that implements
Lack of dependence , Join in json rely on

        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-json</artifactId>            <version>${flink.version}</version>        </dependency>
SQL And DataStream API Transformation

SQL Functions that can't be realized DataStream API To realize , Implement one as follows log Alarm function

public class tableDemo3 {    public static void main(String[] args) throws Exception {        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        // Create random mock data source         tableEnv.executeSql("CREATE TABLE tag_money_table( tag STRING ,id INT ,money INT,ts AS CAST(CURRENT_TIMESTAMP AS timestamp_LTZ(3)), WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) " +                " WITH( 'connector' = 'datagen',  " +                " 'rows-per-second'='1',  " +                " 'fields.id.kind'='sequence', " +                " 'fields.id.start'='1'," +                " 'fields.id.end'='1000'," +                " 'fields.tag.length'='1'," +                " 'fields.money.min'='100'," +                " 'fields.money.max'='10000')");        // Using Windows TVF Realize the cumulative window         Table sqlQuery = tableEnv.sqlQuery("SELECT UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 ,window_end , COUNT(DISTINCT id ) ,SUM(money) as money FROM TABLE(CUMULATE(TABLE tag_money_table ,DESCRIPTOR(ts) ,INTERVAL '5' SECOND ,INTERVAL '30' SECOND  )) GROUP BY window_start ,window_end ");        // establish Logger object         Logger logger = LoggerFactory.getLogger(tableDemo3.class);        // Convert table to stream         tableEnv.toDataStream(sqlQuery,Row.class).flatMap(new FlatMapFunction<Row, String>() {            @Override            public void flatMap(Row value, Collector<String> out) throws Exception {               long l = Long.parseLong(String.valueOf(value.getField("money")));                 if ( l >= 10000L){                    logger.info("...... The alarm .....");                    logger.info("-------");                 }            out.collect(value.toString());            }        }).print();        env.execute();    }}

You want to configure log4j.properties, Put this folder in src/main/resources Next

###  Set up ###log4j.rootLogger = debug,stdout,D,E###  Output information to control lifting  ###log4j.appender.stdout = org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target = System.outlog4j.appender.stdout.layout = org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n###  Output DEBUG  Log above level to =/home/hao/Desktop/error.log ###log4j.appender.D = org.apache.log4j.DailyRollingFileAppenderlog4j.appender.D.File = /home/hao/Desktop/error.loglog4j.appender.D.Append = truelog4j.appender.D.Threshold = DEBUG log4j.appender.D.layout = org.apache.log4j.PatternLayoutlog4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n###  Output ERROR  Log above level to =/home/hao/Desktop/error.log ###log4j.appender.E = org.apache.log4j.DailyRollingFileAppenderlog4j.appender.E.File =/home/hao/Desktop/error.loglog4j.appender.E.Append = truelog4j.appender.E.Threshold = ERROR log4j.appender.E.layout = org.apache.log4j.PatternLayoutlog4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
date . Time type
  1. DATE: By year - month - Japan Date function type without time zone meaning , Value range [0000-01-01 , 9999-12-31]

  2. TIME,TIME(p): Time data type without time zone , from hour:minute:second[.fractional] form , Accuracy reaches nanoseconds , Range from 00:00:00.000000000 To 23:59:59.999999999. And SQL Compared to the standard , I won't support it leap seconds(23:59:60 and 23:59:61), Semantically closer to java.time.LocalTime. No time with time zone provided . among p Is the number of decimal places of the second ( precision ).p The value of must be between 0 and 9 Between ( Including boundary value ). If no precision is specified , be p be equal to 0

  3. TIMESTAMP,TIMESTAMP(p): Timestamp data type without time zone , from year-month-day hour:minute:second[.fractional] form , Accuracy reaches nanoseconds , Range from 0000-01-01 00:00:00.000000000 To 9999-12-31 23:59:59.999999999. And SQL Compared to the standard , I won't support it leap seconds(23:59:60 and 23:59:61), Semantically closer to java.time.LocalDateTime among p Is the number of decimal places of the second ( precision ).p The value of must be between 0 and 9 Between ( Including boundary value ). If no precision is specified , be p be equal to 6.TIMESTAMP(p) WITHOUT TIME ZONE Equivalent to this type

  4. TIMESTAMP WITH TIME ZONE,TIMESTAMP(p) WITH TIME ZONE: Timestamp data type with time zone , from year-month-day hour:minute:second[.fractional] zone form , Accuracy reaches nanoseconds , Range from 0000-01-01 00:00:00.000000000 +14:59 To 9999-12-31 23:59:59.999999999 -14:59 among p Is the number of decimal places of the second ( precision ).p The value of must be between 0 and 9 Between ( Including boundary value ). If no precision is specified , be p be equal to 6

  5. TIMESTAMP_LTZ、TIMESTAMP_LTZ(p): from year - month - Japan ⼩ when : minute : second [.⼩ Seconds ] The time zone Composed of With time zone meaning The type of time , Value range [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]. among p The number of decimal seconds , The value range is [0,9], If you don't specify p, The default is 6

  6. INTERVAL YEAR TO MONTH: One group is made up of Year-Month Interval The data type of the composition , It ranges from -9999-11 To +9999-11, Can express :

INTERVAL YEARINTERVAL YEAR(p)INTERVAL YEAR(p) TO MONTHINTERVAL MONTH

among p Is the number of years ( Annual precision ) Number of digits .p The value of must be between 1 and 4 Between ( Including boundary value ). If annual precision is not specified ,p It is equal to 2.

Interval literal explain
INTERVAL '3' YEAR The time interval is 3 year
INTERVAL '3' MONTH The time interval is 3 Months
INTERVAL '3-4' YEAR TO MONTH The time interval is 3 year 4 Months
  1. INTERVAL DAY TO SECOND: One group is made up of Day-Time Interval The data type of the composition . The time interval is determined by +days hours:minutes:seconds.fractional form , It ranges from -999999 23:59:59.999999999 To +999999 23:59:59.999999999, Can express :
INTERVAL DAYINTERVAL DAY(p1)INTERVAL DAY(p1) TO HOURINTERVAL DAY(p1) TO MINUTEINTERVAL DAY(p1) TO SECOND(p2)INTERVAL HOURINTERVAL HOUR TO MINUTEINTERVAL HOUR TO SECOND(p2)INTERVAL MINUTEINTERVAL MINUTE TO SECOND(p2)INTERVAL SECONDINTERVAL SECOND(p2)

among p1 It's days ( Day accuracy ) Number of digits ,p2 Is the number of decimal places of the second ( Decimal precision ).p1 The value of must be between 1 Between and 6( Including boundary value ),p2 The value of must be between 0 Between and 9( Including boundary value ). If p1 No value specified , Is equal to by default 2, If p2 No value specified , Is equal to by default 6.

Interval literal explain
INTERVAL '3' DAY The time interval is 3 God
INTERVAL '2' HOUR The time interval is 2 Hours
INTERVAL '25' MINUTE The time interval is 25 minute
INTERVAL '45' SECOND The time interval is 45 second
INTERVAL '3 02' DAY TO HOUR The time interval is 3 Tianzero 2 Hours
INTERVAL '3 02:25' DAY TO MINUTE The time interval is 3 Tianzero 2 Hours 25 branch
INTERVAL '3 02:25:45' DAY TO SECOND The time interval is 3 Tianzero 2 Hours 25 branch 45 second
INTERVAL '02:25' HOUR TO MINUTE The time interval is 2 Hours 25 branch
INTERVAL '02:25:45' HOUR TO SECOND The time interval is 2 Hours 25 branch 45 second
INTERVAL '25:45' MINUTE TO SECOND The time interval is 25 branch 45 second

3. Convert dynamic output table to output data

Of tables in the database DML Behavior has 3 Kind of : INSERT、UPDATE、DELETE. The concept of a dynamic table as a table similar to a database , Also support this 3 Kind of DML operation , But it's a little different
stay Flink in , Divide the dynamic table into 3 Types :
1) Only update behavior , A table that has only one or more rows but is continuously updated .
2) Only the insertion behavior , No, UPDATE、DELETE Changed insert table only .
3) Tables with both insert and update behavior .
When a dynamic table is converted to a stream or written to an external system , Changes to dynamic tables ( modify ) Need to be translated into behavior on the stream , this 3 Types of dynamic tables , Corresponding to different types of data flow .Flink Of Table API & SQL Support 3 Changes on dynamic tables in three ways ( modify ).Flink Of Table API & SQL Support 3 Changes on dynamic tables in three ways ( modify ).
(1). Append flow
Append Stream only supports append write behavior , Only support INSERT Dynamic table of behavior , I won't support it Update、Delete Change the behavior of existing data .
(2). Retract flow
Retract The stream contains two types of messages :add News and retract news . The message types corresponding to the change behavior of the dynamic table are as follows .
1)INSERT Change to on stream add news .
2)DELETE Change to on stream retract news .
3)UPDATE The change is converted to two messages , That is, for the old records retract Messages and new records add news .

2022-06-21-Flink-49( One . SQL manual ) - The first 1 Zhang

(3). Upsert flow
Upsert The stream contains two types of messages :update News and delete news . The dynamic table is converted to Upsert The stream must have a primary key ( It can be a composite primary key ), The message types corresponding to the change behavior of dynamic tables with primary keys are as follows .
1)INSERT、UPDATE Convert to UPSERT news .
2)DELETE Convert to delete news .

2022-06-21-Flink-49( One . SQL manual ) - The first 2 Zhang

4. SQL There are two ways to specify the time attribute in

If you want to be full ⾜ Flink SQL Time window ⼝ Class ,SQL or Table API Medium Data source table You need to provide the time attribute ( It is equivalent to that we put this time attribute in Data source table On ⾯ Into the ⾏ Statement ), as well as ⽀ Hold time related operations .FlinkSQL Two ways to specify timestamp are provided for us

  1. CREATE TABLE DDL When creating a table, specify

  2. Can be in DataStream It is specified in , In the subsequent DataStream turn Table Use in

Event Time
CREATE TABLE user_actions (  user_name STRING,  data STRING,  user_action_time TIMESTAMP(3),  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND) WITH (  ...);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);CREATE TABLE user_actions (  user_name STRING,  data STRING,  ts BIGINT,  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),  -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND) WITH (  ...);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
// Option 1:// extract timestamp and assign watermarks based on knowledge of the streamDataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// declare an additional logical field as an event time attributeTable table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());// Option 2:// extract timestamp from first field, and assign watermarks based on knowledge of the streamDataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// the first field has been used for timestamp extraction, and is no longer necessary// replace first field with a logical event time attributeTable table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));// Usage:WindowedTable windowedTable = table.window(Tumble       .over(lit(10).minutes())       .on($("user_action_time"))       .as("userActionWindow"));
Processing Time
CREATE TABLE user_actions (  user_name STRING,  data STRING,  user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute) WITH (  ...);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
DataStream<Tuple2<String, String>> stream = ...;// declare an additional logical field as a processing time attributeTable table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());WindowedTable windowedTable = table.window(        Tumble.over(lit(10).minutes())            .on($("user_action_time"))            .as("userActionWindow"));

Flink Table & SQL Time attribute
Time Attributes | Apache Flink
author : Vaguely like Mengmeng

Game programming , A game development favorite ~

If the picture is not displayed for a long time , Please use Chrome Kernel browser .

原网站

版权声明
本文为[Game programming]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/176/202206250039263837.html

随机推荐