overview
stay telemetry Gathering , Because of the huge amount of data , Generally, distributed architecture is adopted ; The message queue is used to decouple the systems . There are systems as follows :
- The equipment reports various data to the collector , The collector plays the role of format conversion . Convert various equipment data into a unified format .
- The collector writes data to the message queue , Other back-end services , Such as “ analysis ”,“ The alarm ” Wait for the service to fetch data from the message queue , Conduct relevant actual business .
The unified format after the collector conversion is as follows :
syntax = "proto3";
package talos.talosdata;
message Head {
uint64 time = 1;
string ip = 2; // Mechanical IP
// .......
}
message TalosAttrData {
Head head = 1;
bytes data = 2;
}
among ,bytes data data , You can unpack the array in the following format again :
message Data {
int32 attr_id = 1; // Indicators applied for ID
string attr_name = 2; // Corresponding indicator name
float value = 3;
// ......
}
problem : The back-end analysis system wastes resources
Because it is a general acquisition system , It is not convenient to perceive the specific business . All types of messages are written to the same message queue .
Suppose the back-end business system has The alarm service , It only focuses on attr_id = 10001 The data of . It needs to consume the data in the whole message queue and judge whether each data is the target data .
The pseudocode is as follows :
kafkaMsg := kafka.Recv()
// Unpack at one time
baseData := new(TalosAttrData)
proto.Unmarshal(kafkaMsg.Bytes(), baseData)
// Secondary unpacking
dataArr := make(DataArr, 0)
proto.Unmarshal(baseData.Data, &dataArr)
for _, data := range dataArr {
if data.AttrId == 10001{
// do sth
}
}
in fact ,10001 The news of , It may only account for... Of the total number of messages 1%, But the user system needs to solve and traverse all the data . This is obviously unreasonable .
The emergence of the above problems , There are two main reasons :
- All types of data are placed on the same message queue without distinction topic in , This is the main contradiction .
- The key information
attr_idThe deepest inclusions must be solved to obtain .
Solve the first problem , In essence, it is necessary to introduce Refined message distribution The ability of , That is to say Subscribe to... On demand System . Because scalability and operation and maintenance are considered in the implementation , It is almost impossible for every type of attr_id Message queues that are too finely allocated topic. There are many details about this place , But not the point of this article , Not for the time being .
To solve the second problem , Assume no resolution bytes data = 2; We can determine whether there is target data in this data , The second unpacking can be avoided .
solution : Add a summary field for each message
As noted above , There is also one for each message head Field , When unpacking for the first time , Then we can solve :
message Head {
uint64 time = 1;
string ip = 2; // Mechanical IP
// .......
}
To determine whether something exists in a set , Obviously, you should use a bloom filter .
What is a bloon filter
The bloom filter is very simple , Friends who don't know need to read this article first :https://blog.csdn.net/zhanjia/article/details/109313475
Suppose you use 8bit As bloom filter The storage , There are two arbitrary hash function ( such as md5/sha256)
In the initial case ,8 Position as 0.
0000 0000
Input is hello, Suppose to be right hello Take the first time hash: hash1("hello") % 8 = 7, Will store the 7 Location 1:
1000 0000
Same for hello Take the second time hash:hash2("hello") % 8 = 3, Will store the 3 The position is 1:
1000 1000
If you want to judge hello Whether in bloom filter In storage , Then you only need to check the 3/7 Is the location correct? 1, because hello Two times hash The results are known :
assert bloomData & b10001000 == b10001000
obviously : Hypothesis number 1 3、7 All for 1, be hello It may exist in bloom filter in , If any one is not 1, be hello It must not be bloom filter in .
bloom filter The advantage is that :
- Use Little storage Represents a collection ( In this case, a uint64)
- determine ( And bit Bit comparison ) More data “ It must not exist in ” or “ It may exist in ” In this collection .
Extract abstract
The general usage of Bloom filter is to use a very large set to determine whether a large amount of data exists , For example, a reptile uses a N Long bloan filter , To determine the mass of url Have you traversed .
But this article does the opposite , Attach a short message summary to each piece of data , Then, the business side determines whether the summary meets the conditions .
stay
headIn the message body , newly added filter Field :message Head { uint64 time = 1; string ip = 2; // Mechanical IP // ....... uint64 filter = 10; // bloom Filter fields }Some functions are as follows , Any message can be abstracted , And placed in uint64 in . ad locum hash1 yes md5,hash2 yes sha256 Algorithm . Use other hash The algorithm can also .
// SetBloomUInt64 Use one uint64 do bloom Filter storage , to msg Do summary extraction and set to origin in , The return value is the set value func SetBloomUInt64(origin uint64, msg []byte) uint64 { origin = origin | 1<<(hash1(msg)%64) origin = origin | 1<<(hash2(msg)%64) return origin } func hash1(msg []byte) uint32 { hash := md5.New() hash.Write(msg) bts := hash.Sum(nil) return crc32.ChecksumIEEE(bts) } func hash2(msg []byte) uint32 { hash := sha256.New() hash.Write(msg) bts := hash.Sum(nil) return crc32.ChecksumIEEE(bts) }During format conversion of the collector , Send each message to
attr_idExtract the summary , Loop onhead.filterField . This summary can be used by all businesses in the future .// extract bloom Abstract var filter uint64 for _, v := range data { bs := make([]byte, 4) binary.LittleEndian.PutUint32(bs, uint32(v.AttrId)) filter = bloom.SetBloomUInt64(filter, bs) // bloom The filter algorithm ensures that setting duplicate summaries does not affect the results } result.Head.Filter = filterKey steps , Subsequent business parties may, in accordance with filter Field , In the analysis of head after , To roughly determine whether this message contains target data , In this way, there is no need to carry out a secondary data Parsing and traversal :
func blAttrID(attrID uint32) uint64 { bts := make([]byte, 4) binary.LittleEndian.PutUint32(bts, uint32(attrID)) return bloom.SetBloomUInt64(0, bts) } var bl10001 = blAttrID(10001) // take 10001 Convert to origin by 0 Of , after bloom Data after filter processing // ... filter := talosData.Head.Filter if filter&bl10001 == bl10001{ //do sth }
Why can this be
adopt bloom filter , Every message head It's all about []data All of the attr_id A summary of the . This is based on the following assumptions :
- Contained in the same message
attr_idCannot have too many types of . According to the literature , Suppose you use uint64 As the length of the filter , When hash The number of functions is 2,attr_idThe species of the species are 10, Then the miscalculation rate is 0.08; If the category is 20, Then the miscalculation rate is 0.2. - Miscalculation rate refers to : The decision data is included in the summary , But the actual data does not exist . Assume that the decision data does not exist in the summary , Then the data must not exist . Therefore, the miscalculation rate does not cause logical errors , At best, there will be more redundant calculations .
Through this small optimization , Add some calculations on the production side , You can provide services for all subsequent businesses . Business can be done at one time uint64 Take or In time , Determine whether the whole data meets the requirements . Reduce the pressure on the business system .





![Front and back management system of dessert beverage store based on SSM framework dessert mall cake store [source code + database]](/img/1b/9060d58d4dbb7f6f3c3a58959b7f14.png)

![[introduction to postgraduate entrance examination] analysis of postgraduate entrance examination data of Cyberspace Security Major of Beijing Jiaotong University from 2018 to 2022](/img/84/b572b3b80cc0dd1489076116cf0638.png)
