当前位置:网站首页>Flink data source disassembly and analysis (Wikipedia editssource)
Flink data source disassembly and analysis (Wikipedia editssource)
2022-07-23 07:05:00 【InfoQ】
Welcome to visit mine GitHub
- Wikipedia Edit Streamyes Flink Classics on the official website demo, The function is to process messages from Wikipedia in real time , The content of the message is the current operation of each user on the wiki content , The address is :https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html
- stay demo in ,WikipediaEditsSource Class as the data source is responsible for Flink Provide real-time messages , Today, let's analyze its source code , understand Flink How to get from Wiki Of real-time data , This is also a good reference for us to make custom data sources in the future ;
Official explanation
- The following is the official website's description of the source , Wikipedia provides a IRC Access to protocol , From this channel, you can get the log of the editing behavior of Wikipedia :
Wikipedia provides an IRC channel where all edits to the wiki are logged.
- IRC It's the application layer protocol , See... For more details :https://en.wikipedia.org/wiki/Internet_Relay_Chat
Inheritance relationships
- First look at WikipediaEditsSource Class inheritance , Make a preliminary understanding , Here's the picture :

- As shown in the figure above ,RichFunction The interface is responsible for opening and closing resources and environment context , and SourceFunction The interface is related to the start and stop of data production behavior , These interfaces are finally WikipediaEditSource Realization ;
Construction method
- Through the construction method, we can know which parameters have been determined :
// The domain name of the remote connection
public static final String DEFAULT_HOST = "irc.wikimedia.org";
// Remote connection port
public static final int DEFAULT_PORT = 6667;
//IRC Agreed channel
public static final String DEFAULT_CHANNEL = "#en.wikipedia";
private final String host;
private final int port;
private final String channel;
public WikipediaEditsSource() {
this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL);
}
public WikipediaEditsSource(String host, int port, String channel) {
this.host = host;
this.port = port;
this.channel = Objects.requireNonNull(channel);
}
- Through the above code, you can see , The source of the data isirc.wikimedia.orgThis web site ;
Main business code
- The main business logic is WikipediaEditsSource Of run Method , This method will be StreamSource.run Method call :
@Override
public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {
try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {
// Create a IRC Protocol connection
ircStream.connect();
// Enter the specified channel
ircStream.join(channel);
try {
while (isRunning) {
// Getting data from a blocked queue
WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
// If you get the data , Just call ctx.collect Method , Produce data to Flink Environmental Science , To others operator Use
if (edit != null) {
ctx.collect(edit);
}
}
} finally {
// At the end, send data to the server to leave
ircStream.leave(channel);
}
}
}
- The above code , Let's pick out some important places to have a look ;
What to do after establishing a connection with Wikipedia message server
- To understand Flink How to establish a connection with the data source of Wikipedia , The first **ircStream.connect()** This code expands , The corresponding is IRCConnection Class connect Method :
public void connect() throws IOException {
if (level != 0) // otherwise disconnected or connect
throw new SocketException("Socket closed or already open ("+ level +")");
IOException exception = null;
Socket s = null;
for (int i = 0; i < ports.length && s == null; i++) {
try {
// The establishment is ordinary Socket Connect
s = new Socket(host, ports[i]);
exception = null;
} catch (IOException exc) {
if (s != null)
s.close();
s = null;
exception = exc;
}
}
if (exception != null)
throw exception; // connection wasn't successful at any port
prepare(s);
}
- The above code indicates that ,Flink And the data source server of Wikipedia is common Socket Connect , as for IRC agreement , It's all here Socket Some read and write operations in the connected channel ;
- above prepare The method is key , Expand and see :
protected void prepare(Socket s) throws IOException {
if (s == null)
throw new SocketException("Socket s is null, not connected");
socket = s;
level = 1;
s.setSoTimeout(timeout);
in = new BufferedReader(new InputStreamReader(s.getInputStream(),
encoding));
out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),
encoding));
//IRCConnection yes Thread Subclasses of , perform start Method indicates that a thread will be started to execute IRCConnection Of run Method
start();
// comply with IRC Agreement , Send some registration related content
register();
}
- It can be seen that ,prepare Method does two important things : Start a child thread 、 send out IRC Registration information of the agreement , Next, let's see what the sub thread started does ;
- open IRCConnection Of run Method :
public void run() {
try {
String line;
while (!isInterrupted()) {
line = in.readLine();
if (line != null)
get(line);
else
close();
}
} catch (IOException exc) {
close();
}
}
- run The content of the method is very simple , This sub thread is responsible for reading the string sent by the remote end , Every time you read a line, call get How to deal with ;
- get There are a lot of methods , What you do is based on IRC The protocol parses this string and then does different processing , Here we just need to pay attention to the following paragraph , That is, how to deal with a business message after receiving it :
// Whenever someone edits Wikipedia , You will receive one here command by PRIVMSG The record of
if (command.equalsIgnoreCase("PRIVMSG")) { // MESSAGE
IRCUser user = p.getUser();
String middle = p.getMiddle();
String trailing = p.getTrailing();
for (int i = listeners.length - 1; i >= 0; i--)
// call listener Of onPrivmsg Method
listeners[i].onPrivmsg(middle, user, trailing);
}
- As shown above , Every time you receive a message from the far end , Will be called listener Of onPrivmsg Method , Registered here linstener yes WikipediaIrcChannelListener object ;
- open WikipediaIrcChannelListener Of onPrivmsg Method , See what you did after receiving the message :
@Override
public void onPrivmsg(String target, IRCUser user, String msg) {
LOG.debug("[{}] {}: {}.", target, user.getNick(), msg);
// Construct a WikipediaEditEvent object , Namely Flink Data objects used in business processes
WikipediaEditEvent event = WikipediaEditEvent.fromRawEvent(
System.currentTimeMillis(),
target,
msg);
if (event != null) {
//eidts It's a blocking queue ,WikipediaEditEvent Put in queue
if (!edits.offer(event)) {
LOG.debug("Dropping message, because of full queue.");
}
}
}
- The above code has been analyzed to show the main logic , from Socket The read data is parsed into Flink Used in real-time calculation WikipediaEditEvent After the object , Put into the blocking queue , This is the main work of the sub thread responsible for reading ;
How to consume data in the queue
- From the previous analysis, we know : The received data is put into the blocking queue , Now back to WikipediaEditsSource Of run Let's look at the method , There is the operation of fetching data from the blocking queue :
while (isRunning) {
// Getting data from a blocked queue
WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
// If you get the data , Just call ctx.collect Method , Produce data to Flink Environmental Science , To others operator Use
if (edit != null) {
ctx.collect(edit);
}
}
- As shown above , One while Get data from the blocking queue repeatedly , Call when you get it SourceContext Of collect, Produce a piece of data to Flink Environment , For the later process ;
Summary
- thus ,WikipediaEditsSource The analysis of the source code is completed , Here's a summary :
- and irc.wikimedia.org This website is established Socket Connect ;
- After the connection is established , Reading and writing related content are based on IRC Agreed , This is an application layer protocol , Have their own format 、 keyword 、 Command words and other conventions , We didn't spend much time on this agreement in this analysis , Interested readers can learn more here :https://en.wikipedia.org/wiki/Internet_Relay_Chat
- Start a sub thread to read Socket Information , After receiving the data , Constructed as WikipediaEditEvent object , Put in a blocking queue ;
- The original thread is in a while Get data from the blocking queue in the loop , If you get the data, call ctx.collect Method , In this way, the data is produced Flink Environmental Science , other operator You can use it ;
- The above is disassembly WikipediaEditsSource The process of , Now we're right Flink Data sources have a better understanding , Later, there are also reference implementations when developing custom data sources ;
Welcome to your attention InfoQ: Xinchen, programmer
边栏推荐
猜你喜欢

Jupyternotebook runs to the specified line

GB28181流媒体服务LiveGBS使用和二次开发中常见问题总结

小红书携手HMS Core,畅玩高清视界,种草美好生活

第九章 使用图像数据

100 行代码透彻解析 RPC 原理

HMS Core Discovery第16期直播预告|与虎墩一起,玩转AI新“声”态

TCP waves four times

实现OPC UA publish/subscribe单次发送

Data warehouse: Exploration and practice of integrating flow and batch

The synchronized lock that I have been wondering about is so simple!
随机推荐
怎么让自己在how old robot显得年轻?我看起来几岁更年轻方法技巧
尿酸检测与注意事项
Uric acid detection and precautions
CV目标检测模型小抄(1)
CV语义分割模型小抄(1)
CV target detection model sketch (1)
2021-03-01
《STL容器篇》-Vector模拟实现
ABAP ALV步骤
Realize OPC UA publish/subscribe single send
LiveGBS-摄像机网页低延时无插件直播实现
thinkphp URL_MODE =0 普通模式 的具体用法
Stability control and Simulation of double inverted pendulum system (matlab/simulink)
CV semantic segmentation model sketch (1)
BGP基本配置和路由聚合
Web3产品经理指南:如何面向加密世界
小马激活工具出现Cannot open file k:\OEMSF 的解决方法
为什么TCP建立连接协议是三次握手,而关闭连接却是四次握手呢?
12306史上最奇葩验证码:正常用户可轻松识别 抢票软件被拒之门外
ipv4无internet访问权限怎么办?ipv4无internet访问权限解决方法(图文详解)