当前位置:网站首页>Flink's datasource Trilogy: direct API
Flink's datasource Trilogy: direct API
2020-11-06 20:59:00 【Programmer Xinchen】
Welcome to visit mine GitHub
https://github.com/zq2599/blog_demos
Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;
This article is about 《Flink Of DataSource Trilogy 》 The first in the series , This series aims to learn and understand through actual combat Flink Of DataSource, Lay a good foundation for further study , It consists of three parts :
- direct API: This is the article. , In addition to preparing the environment and Engineering , And learned StreamExecutionEnvironment Provided to create data API;
- built-in connector:StreamExecutionEnvironment Of addSource Method , The reference can be flink Built in connector, for example kafka、RabbitMQ etc. ;
- Customize :StreamExecutionEnvironment Of addSource Method , The input parameters can be customized SourceFunction Implementation class ;
Flink Of DataSource Trilogy article links
- 《Flink Of DataSource One of the trilogy : direct API》
- 《Flink Of DataSource Trilogy two : built-in connector》
- 《Flink Of DataSource Trilogy three : Customize 》
About Flink Of DataSource
The official response to DataSource The explanation of :Sources are where your program reads its input from, namely DataSource It's the data source of the application , As shown in the two red boxes below : 
DataSource type
For common text reading 、kafka、RabbitMQ Etc , You can use it directly Flink Provided API perhaps connector, If these can't meet the needs , You can also develop your own , The picture below is my own understanding : 
Environment and version
Master built-in DataSource The best way is to fight , The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
Source download
If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):
| name | link | remarks |
|---|---|---|
| Project home page | https://github.com/zq2599/blog_demos | The project is in progress. GitHub Home page on |
| git Warehouse address (https) | https://github.com/zq2599/blog_demos.git | The warehouse address of the source code of the project ,https agreement |
| git Warehouse address (ssh) | [email protected]:zq2599/blog_demos.git | The warehouse address of the source code of the project ,ssh agreement |
This git Multiple folders in project , The application of this chapter in <font color="blue">flinkdatasourcedemo</font> Under the folder , As shown in the red box below : 
Environment and version
The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
Create a project
- Execute the following command on the console to enter creation flink Interaction mode of application , Press the prompt to enter gourpId and artifactId, It creates a flink application ( I input groupId yes <font color="blue">com.bolingcavalry</font>,artifactId yes <font color="blue">flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
- Now? maven The project has generated , use IDEA Import this project , Here's the picture :

- With maven Type import :

- Import successful look :

- Project created successfully , You can start to write code ;
Auxiliary class Splitter
There is a function commonly used to : Split the string with spaces , Turn into Tuple2 A collection of types , Let's make this operator a common class Splitter.java, The code is as follows :
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
Ready , It's time to start fighting , Start with the simplest Socket Start .
Socket DataSource
Socket DataSource The function of is to monitor the specified IP Specified port for , Read network data ;
- Create a class in the new project Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Monitor local 9999 port , Read string
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
// Every five seconds , Divide all strings in the current five seconds into spaces , Then count the number of words , Print out
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("API DataSource demo : socket");
}
}
From the above code, we can see ,StreamExecutionEnvironment.socketTextStream I can create Socket Type of DataSource, Execute the command at the console <font color="blue">nc -lk 9999</font>, Enter interactive mode , At this time, output any string and enter , Will transfer the string to the native 9999 port ;
- stay IDEA Up operation Socket class , After successful startup, go back to the execution <font color="blue">nc -lk 9999</font> In the console , Enter some strings and enter , so Socket The function of is already in effect :

aggregate DataSource(generateSequence)
- Set based DataSource,API As shown in the figure below :
2. First try the simplest generateSequence, Create a numeric... In the specified range DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
// adopt generateSequence obtain Long Type of DataSource
DataStream<Long> dataStream = env.generateSequence(1, 10);
// Do a filter , Keep only even numbers , And then print
dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute("API DataSource demo : collection");
}
}
- Even numbers will be printed at run time :

aggregate DataSource(fromElements+fromCollection)
- fromElements and fromCollection Try it in one class , establish <font color="blue">FromCollection</font> class , Inside are these two API Usage of :
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
// Create a List, There are two in it Tuple2 Elements
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
// adopt List establish DataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
// Through multiple Tuple2 Element creation DataStream
DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
new Tuple2("ccc", 1),
new Tuple2("ddd", 1),
new Tuple2("aaa", 1)
);
// adopt union Put two DataStream Synthesis of a
DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);
// Count the number of words
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : collection");
}
}
- The operation results are as follows :

file DataSource
- Below ReadTextFile Class will read the absolute path text file , And make word statistics for the content :
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadTextFile {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism to 1
env.setParallelism(1);
// use txt File as data source
DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");
// Count the number of words and print them out
textDataStream
.flatMap(new Splitter())
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : readTextFile");
}
}
- Make sure that the absolute path in your code exists under the name README.txt file , The operation results are as follows :
3. open StreamExecutionEnvironment.java Source code , Take a look at the readTextFile The method is as follows , It turns out that another method with the same name was called , The third parameter of this method determines that the text file is read once , Or periodic scanning of content changes , The fourth parameter is the interval between periodic scans :
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
- above FileProcessingMode It's an enumeration. , Source code is as follows :
@PublicEvolving
public enum FileProcessingMode {
/** Processes the current contents of the path and exits. */
PROCESS_ONCE,
/** Periodically scans the path for new data. */
PROCESS_CONTINUOUSLY
}
- Please also pay attention to <font color="blue">readTextFile</font> Methodical <font color="red">filePath</font> Parameters , This is a URI String of type , Except for the local file path , It can also be HDFS The address of :<font color="blue">hdfs://host:port/file/path</font>
thus , By direct API establish DataSource The actual battle of is finished , We will continue to learn about built-in connector The way of DataSource;
Welcome to the official account : Xinchen, programmer
WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos
版权声明
本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢
边栏推荐
- Bitcoin once exceeded 14000 US dollars and is about to face the test of the US election
- What knowledge do Python automated testing learn?
- 每个大火的“线上狼人杀”平台,都离不开这个新功能
- ES6 learning notes (3): teach you to use js object-oriented thinking to realize the function of adding, deleting, modifying and checking tab column
- jenkins安装部署过程简记
- How to turn data into assets? Attracting data scientists
- How to get started with new HTML5 (2)
- How about small and medium-sized enterprises choose shared office?
- 解决 WPF 绑定集合后数据变动界面却不更新的问题
- An article takes you to understand CSS3 picture border
猜你喜欢

What are PLC Analog input and digital input

CCR coin frying robot: the boss of bitcoin digital currency, what you have to know

解决 WPF 绑定集合后数据变动界面却不更新的问题

MongoDB与SQL常用语法对应表

Description of phpshe SMS plug-in

华为云微认证考试简介

美团内部讲座|周烜:华东师范大学的数据库系统研究

递归、回溯算法常用数学基础公式

How to turn data into assets? Attracting data scientists
![Network security engineer Demo: the original * * is to get your computer administrator rights! [maintain]](/img/14/ede1ffa7811dbc2a5b15b9a7b17a5e.jpg)
Network security engineer Demo: the original * * is to get your computer administrator rights! [maintain]
随机推荐
Xmppmini project details: step by step from the principle of practical XMPP technology development 4. String decoding secrets and message package
Description of phpshe SMS plug-in
Share with Lianyun: is IPFs / filecoin worth investing in?
JNI-Thread中start方法的呼叫與run方法的回撥分析
The legality of IPFs / filecoin: protecting personal privacy from disclosure
【應用程式見解 Application Insights】Application Insights 使用 Application Maps 構建請求鏈路檢視
What knowledge do Python automated testing learn?
es创建新的索引库并拷贝旧的索引库 实践亲测有效!
image operating system windows cannot be used on this platform
開源一套極簡的前後端分離專案腳手架
An article takes you to understand CSS gradient knowledge
Use modelarts quickly, zero base white can also play AI!
CloudQuery V1.2.0 版本发布
JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
Bitcoin once exceeded 14000 US dollars and is about to face the test of the US election
What are the criteria for selecting a cluster server?
嘉宾专访|2020 PostgreSQL亚洲大会阿里云数据库专场:曾文旌
每个大火的“线上狼人杀”平台,都离不开这个新功能
大会倒计时|2020 PostgreSQL亚洲大会-中文分论坛议程安排
Even liver three all night, jvm77 high frequency interview questions detailed analysis, this?