当前位置:网站首页>Data collection: skillfully using Bloom filter to extract data summary

Data collection: skillfully using Bloom filter to extract data summary

2022-06-22 14:38:00 superpigwin

overview

stay telemetry Gathering , Because of the huge amount of data , Generally, distributed architecture is adopted ; The message queue is used to decouple the systems . There are systems as follows :

  1. The equipment reports various data to the collector , The collector plays the role of format conversion . Convert various equipment data into a unified format .
  2. The collector writes data to the message queue , Other back-end services , Such as “ analysis ”,“ The alarm ” Wait for the service to fetch data from the message queue , Conduct relevant actual business .

The unified format after the collector conversion is as follows :

syntax = "proto3";

package talos.talosdata;

message Head {
  uint64 time = 1;
  string ip = 2; //  Mechanical IP
  // .......
}

message TalosAttrData {
  Head head = 1;
  bytes data = 2;
}

among ,bytes data data , You can unpack the array in the following format again :

  message Data {
    int32 attr_id = 1;    //  Indicators applied for ID
    string attr_name = 2; //  Corresponding indicator name 
    float value = 3;
	// ......
  }

problem : The back-end analysis system wastes resources

Because it is a general acquisition system , It is not convenient to perceive the specific business . All types of messages are written to the same message queue .
Suppose the back-end business system has The alarm service , It only focuses on attr_id = 10001 The data of . It needs to consume the data in the whole message queue and judge whether each data is the target data .

The pseudocode is as follows :

kafkaMsg := kafka.Recv()
//  Unpack at one time 
baseData := new(TalosAttrData) 
proto.Unmarshal(kafkaMsg.Bytes(), baseData)

//  Secondary unpacking 
dataArr := make(DataArr, 0)
proto.Unmarshal(baseData.Data, &dataArr)

for _, data := range dataArr {
	if data.AttrId == 10001{
		// do sth
	}
}

in fact ,10001 The news of , It may only account for... Of the total number of messages 1%, But the user system needs to solve and traverse all the data . This is obviously unreasonable .

The emergence of the above problems , There are two main reasons :

  1. All types of data are placed on the same message queue without distinction topic in , This is the main contradiction .
  2. The key information attr_id The deepest inclusions must be solved to obtain .

Solve the first problem , In essence, it is necessary to introduce Refined message distribution The ability of , That is to say Subscribe to... On demand System . Because scalability and operation and maintenance are considered in the implementation , It is almost impossible for every type of attr_id Message queues that are too finely allocated topic. There are many details about this place , But not the point of this article , Not for the time being .

To solve the second problem , Assume no resolution bytes data = 2; We can determine whether there is target data in this data , The second unpacking can be avoided .

solution : Add a summary field for each message

As noted above , There is also one for each message head Field , When unpacking for the first time , Then we can solve :

message Head {
  uint64 time = 1;
  string ip = 2; //  Mechanical IP
  // .......
}

To determine whether something exists in a set , Obviously, you should use a bloom filter .

What is a bloon filter

The bloom filter is very simple , Friends who don't know need to read this article first :https://blog.csdn.net/zhanjia/article/details/109313475
Suppose you use 8bit As bloom filter The storage , There are two arbitrary hash function ( such as md5/sha256)
In the initial case ,8 Position as 0.

0000 0000

Input is hello, Suppose to be right hello Take the first time hash: hash1("hello") % 8 = 7, Will store the 7 Location 1:

1000 0000

Same for hello Take the second time hash:hash2("hello") % 8 = 3, Will store the 3 The position is 1:

1000 1000

If you want to judge hello Whether in bloom filter In storage , Then you only need to check the 3/7 Is the location correct? 1, because hello Two times hash The results are known :

assert bloomData & b10001000 == b10001000

obviously : Hypothesis number 1 3、7 All for 1, be hello It may exist in bloom filter in , If any one is not 1, be hello It must not be bloom filter in .

bloom filter The advantage is that :

  1. Use Little storage Represents a collection ( In this case, a uint64)
  2. determine ( And bit Bit comparison ) More data “ It must not exist in ” or “ It may exist in ” In this collection .

Extract abstract

The general usage of Bloom filter is to use a very large set to determine whether a large amount of data exists , For example, a reptile uses a N Long bloan filter , To determine the mass of url Have you traversed .

But this article does the opposite , Attach a short message summary to each piece of data , Then, the business side determines whether the summary meets the conditions .

  1. stay head In the message body , newly added filter Field :

     message Head {
       uint64 time = 1;
       string ip = 2; //  Mechanical IP
       // .......
       uint64 filter = 10; // bloom Filter fields 
     }
    
  2. Some functions are as follows , Any message can be abstracted , And placed in uint64 in . ad locum hash1 yes md5,hash2 yes sha256 Algorithm . Use other hash The algorithm can also .

     // SetBloomUInt64  Use one uint64 do bloom Filter storage , to msg Do summary extraction and set to origin in , The return value is the set value 
     func SetBloomUInt64(origin uint64, msg []byte) uint64 {
     	origin = origin | 1<<(hash1(msg)%64)
     	origin = origin | 1<<(hash2(msg)%64)
     	return origin
     }
    
     func hash1(msg []byte) uint32 {
     	hash := md5.New()
     	hash.Write(msg)
     	bts := hash.Sum(nil)
     	return crc32.ChecksumIEEE(bts)
     }
    
     func hash2(msg []byte) uint32 {
     	hash := sha256.New()
     	hash.Write(msg)
     	bts := hash.Sum(nil)
     	return crc32.ChecksumIEEE(bts)
     }
    
  3. During format conversion of the collector , Send each message to attr_id Extract the summary , Loop on head.filter Field . This summary can be used by all businesses in the future .

     //  extract bloom Abstract 
     var filter uint64
     for _, v := range data {
     	bs := make([]byte, 4)
     	binary.LittleEndian.PutUint32(bs, uint32(v.AttrId))
     	filter = bloom.SetBloomUInt64(filter, bs) // bloom The filter algorithm ensures that setting duplicate summaries does not affect the results 
     }
     result.Head.Filter = filter
    
  4. Key steps , Subsequent business parties may, in accordance with filter Field , In the analysis of head after , To roughly determine whether this message contains target data , In this way, there is no need to carry out a secondary data Parsing and traversal :

     func blAttrID(attrID uint32) uint64 {
     	bts := make([]byte, 4)
     	binary.LittleEndian.PutUint32(bts, uint32(attrID))
     	return bloom.SetBloomUInt64(0, bts)
     }
    
     var bl10001 = blAttrID(10001) //  take 10001 Convert to origin by 0 Of , after bloom Data after filter processing 
     // ...
     	filter := talosData.Head.Filter
     	if filter&bl10001 == bl10001{
     		//do sth
     	}
    

Why can this be

adopt bloom filter , Every message head It's all about []data All of the attr_id A summary of the . This is based on the following assumptions :

  1. Contained in the same message attr_id Cannot have too many types of . According to the literature , Suppose you use uint64 As the length of the filter , When hash The number of functions is 2,attr_id The species of the species are 10, Then the miscalculation rate is 0.08; If the category is 20, Then the miscalculation rate is 0.2.
  2. Miscalculation rate refers to : The decision data is included in the summary , But the actual data does not exist . Assume that the decision data does not exist in the summary , Then the data must not exist . Therefore, the miscalculation rate does not cause logical errors , At best, there will be more redundant calculations .

Through this small optimization , Add some calculations on the production side , You can provide services for all subsequent businesses . Business can be done at one time uint64 Take or In time , Determine whether the whole data meets the requirements . Reduce the pressure on the business system .

原网站

版权声明
本文为[superpigwin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221315560083.html