当前位置:网站首页>Druid architecture and Implementation

Druid architecture and Implementation

2022-06-24 16:34:00 charmer

Druid It is a real-time analytical database (OLAP), Use a column oriented storage layout 、 Distributed unshared architecture and advanced index structure , You can query the ten billion row table within a delay of seconds .

Introduction

Druid The motivation is to deal with machine production , Massive and low-cost data . In the past, special hardware was needed to process this kind of data 、 The team , Expensive , Therefore, this part of data is often wasted .

Hadoop Use commodity machines to achieve reliable and large amounts of data , Successfully solved the storage problem , But the timeliness of accessing data is still not guaranteed .druid The team tried to use RMDB、NoSQL Realize real-time query , Although it realizes real-time query , But there are a lot of problems , For example, data may not be simply represented in two dimensions 、 Poor flexibility, inability to aggregate extensions, inability to continuously upgrade, etc .

therefore , The gap between massive data storage and production level query performance guarantee , That is druid A place to live .Druid A lot of excellent experience has been used for reference in the implementation of , such as OLAP System 、 Interactive query system 、 Memory database and distributed data storage .

Problem solved

  1. RDBMS and NoSQL It cannot provide the query function required by low latency data collection and interactive applications at the same time .
  2. Multi user 、 High availability .
  3. Allows users and alarm systems to make real-time decisions .

framework

druid The cluster contains a large number of nodes that run completely independently ( process ), Therefore, the impact of intra cluster communication failure on data availability is almost ignored .

at present 0.20.0 edition druid Yes 6 Class process , They are divided into three groups :

  1. master:
    1. coordinator node: Responsible for dispatching data , adopt zookeeper Indirect control historical node Manipulation of data
    2. overload node: Be responsible for scheduling data intake , adopt zookeeper Dispatch 、 assign middle manager node
  2. data:
    1. middle manager node: Responsible for data ingestion
    2. historical node: Responsible for storing data 、 Query data
  3. query:
    1. broker node: Responsible for forwarding queries to data Group
    2. router node: ( Optional ), Responsible for routing queries to broker and data Group

Due to space limitation , This article only introduces the most important middle manager node、historical node、broker node and coordinator node.

Real-Time Nodes

Real-Time Node Data intake and query process

real-time node The concrete implementation in the system is middle-manager node, This node provides ingest and query Two functions . During operation , The node generates an index on the events passed , At this point, you can query . Nodes focus only on collecting events in a small time range , And regularly hand off To Historical Nodes. meanwhile , Nodes are not directly connected to other nodes , But through zookeeper Announce your online status and service data .

The node maintains a memory index buffer for all incoming data (JVM Heap-based buffer), Incrementally populate as events are received , And you can also query . for fear of JVM Heap overflow , The node will persist the index in memory to external memory periodically or when the maximum number of rows set by the user is reached . The format of persistent session storage is described later Storage format in . The index stored in each persistent session is read-only , When querying a request , Nodes load data into non heap memory .

The node will organize a regular background task , Search local persistent storage index . This task merges persistent storage indexes over a period of time and builds , We call it segment. stay hand off Stage , The node first packages and stores the data in deep storage in ( Generally distributed storage ). and ingest、persist、merge and hand off The four steps are fluid , No data is lost in any process . To describe this process in detail , Let's use the following figure to explain :

data ingest、persist、merge and hand off The process

Nodes in the 13:37 Began to run , Only the data within this hour or the next hour will be accepted . When data is ingested , Node to zookeeper Declare that it has 13:00 To 14:00 A period of time segment. Every ten minutes ( Self defined ), The node writes the index in memory to external memory . stay 13:00 To 14:00 At the end of the period , Nodes are likely to start ingesting 14:00 To 15:00 The data of . When a node does this , It will find a new index in memory , Then announce that it is also available to the public 14:00 To 15:00 A period of time segment. Nodes are not immediately merged into external memory 13:00 To 14:00 The index of , But wait for a while ( Self defined ), When this window time ends , Nodes will be merged 13:00 To 14:00 The index of is a read-only segment, And then take it. hand off( Send to deep storage). Only when this segment By another node in the cluster (historical node) Announcement of services , This node will delete the data and announce that it does not provide services .

Availability and scalability

middle-manager node It's the data consumer , Usually , For data persistence purposes , The message bus (e.g. kafka) Located between message producers and middle-manager node Between , Under normal circumstances, the data consumption time is hundreds of milliseconds .

There are two purposes for using the message bus :

  1. buffer . Message bus hold position offset , instructions middle-manager node Read in the event stream offset. Therefore, it can be adjusted manually offset. It is often necessary to transfer the index in the persistent session memory to the external memory ,middle-manager node Will update offset. When the node hangs and recovers , If persistent, the storage is not lost ,middle-manager node All persistent indexes can be loaded from disk , And continue reading data from the last offset it submitted .
  2. Support more middle-manager node Read data at the same time . Multiple nodes can read data in the same time period at the same time , To create a copy of the event . meanwhile , This mode allows partitioning of data streams , So that multiple nodes each consume a portion of the data flow , Enables seamless addition midele-manager node( The biggest support 500MB/s) The speed of consumes raw data ).
offset use

Historical Nodes

historical node load 、 service middle-manager node Provided read only segment block . In the actual workflow ,historical node The loaded data is immutable , It is the main work node of query . The nodes follow the uncoupled architecture , There is no intersection between , Only know how to load 、 Delete and service read-only segment.

and middle-manager node similar ,historical node I will go to zookeeper Announce online status and service data . load 、 The instruction to delete a segment passes zookeeper send out , And include relevant paragraphs in deep storage Information about storage location and how to extract and process segments . Download the requested segment First check your cache Is there already . After processing , Will be in zookeeper The declaration section in can be queried (queryable). Local cache Allow quick updates and restarts historical node. At startup, the node checks the cache and also checks the cache zookeeper announce .

historical node from deep storage load segment

Benefit from read-only segment,historical node Read consistency . meanwhile , Nodes support simple parallelization models ,historical nodes Read only blocks can be scanned and aggregated simultaneously .

layered (Tiers)

historical nodes Can be grouped in different layers , All nodes in a given layer have the same configuration . Different performance and fault tolerance parameters can be set for each layer . Layering is to make priorities different segment It can be distributed according to importance . for example , You can set hot layer , Among them historical node Has a lot of memory and cpu, So you can configure to download more frequently used segment.

Here you can take a look at this article about the experience of engineers : You know Druid Cluster optimization practice

Usability

zookeeper Just control segment Add delete , Will not affect already in cache Medium segment The usability of . Because the query is through HTTP Requested , therefore zookeeper Problems will not affect the query of existing data .

Broker Nodes

broker node yes historical node and middle-manager node Query routing for . By inquiring zookeeper Metadata published in ,broker node Will know which paragraphs queryable, And on which nodes .broker node Route the query to the correct node , The final result is merged and returned to the caller .

cache

broker node Use LRU Strategy .cache Use local heap memory or external distributed key value pairs to store services . Often broker node Received a query request , It will first determine the segment. The results of some segments may already be stored in the cache , There is no need to recalculate . If it does not exist , be broker node Forward the calculation to the correct middle-manager node and historical node in . Once received historical node Result ,broker node Cache results locally . Be careful ,middle-manager node The query of is a real-time query , The results are not cached ( It will still change ).

broker node Process the query

Usability

stay zookeeper When you hang up , You can still query the data . If broker node Can't be with zookeeper signal communication , It will use the last known view of the cluster , And forward the query accordingly . That is to say, it is assumed that the data of the cluster will no longer change and circulate .

Coordinator Nodes

coordinator node Mainly responsible for data management and segment stay historical node The distribution of .coordinator node control historical node Load new segment、 Delete expired segment And copy segment, Move segment To ensure load balancing . To keep the view stable ,druid Use multi-version concurrency control swapping protocal To manage read-only segment. If any read-only segment The contained data is updated segment Completely abandoned , Then the expired segment place it on clipboard .coordinator node Will experience a leadership election (leader-election) The process , To select one as coordinator Function node , The remaining nodes are treated as redundant backups .

coordinator node Run periodically to determine the current state of the cluster . It makes decisions by comparing the expected state of the cluster with the actual state of the runtime cluster . alike ,coordinator node Will also be with zookeeper Maintain a connection . meanwhile , Also maintain and metadata storage The connection of . One of the key messages is a table , Contains all the information that should be provided by historical node List of segments that provide services . This table can be updated by any process that creates segments , such as middle-manager node.metadata storage There is also a Rule table (rule table), Controls how to create... In a cluster 、 Delete and copy segment.

The rules (rules)

Rules control how to load and delete from the cluster historical node Of segment. Rules specify how to segment Assign to different historical node tiers, And how many... Should be in each layer segment Backup of . Rules control when to completely delete segment. Rules are made over a period of time . such as : Users can make rules to load a month's data segment into hot Layer , Load one year's data into cold Layer , Delete other old data .

coordinator node from metadata storage Load a set of rules in , A rule may specify a data source (datasource) Or the default rules for all data sources .coordinator node All available... Will be traversed segment, And will each segment Match the first rule that applies to them .

Load balancing

In the production environment , Queries usually involve dozens or even hundreds of segments . Because each historical node Have limited resources , Therefore, it must be allocated between clusters segment, To ensure that the cluster load is not too unbalanced . However , Determining the best load balancing requires some knowledge of the query lookup pattern . Usually , Queries cover the most recent data in a single data source that spans a continuous time interval segment. On average, , Access is small segment Faster query speed .

These query patterns suggest that the most recent historical node The data of , Will be close to different in time historical node Large scale segment Separate , And put together data segments from different data sources . To optimize allocation and balancing in the cluster segment,druid Use one cost-based The optimization process , Considering segment Data source 、 Closeness and size . The algorithm is not mentioned here .

copy

coordinator node Will control different historical node Load a copy of the same section . The number of copies can be configured , If you need a higher fault tolerance rate, you can set more copies . Copy segment In a normal way segment Agreement , And follow the same load balancing algorithm . Make... By means of copies druid Middle single historical node The fault becomes indifferent . Again , Through this strategy , We can seamlessly integrate historical node offline 、 to update 、 Backup and software upgrade .( A little confused , In this way, you need to manually shut down each node and start it up , see SRE)

Usability

coordinator node rely on zookeeper and metadata storage.

coordinator node utilize zookeeper Determine what already exists in the cluster historical node. if zookeeper Unavailable ,coordinator node Will no longer be able to send assignments 、 Balance and delete segment Instructions . But it doesn't affect data availability .

druid Use metadata storage Store operation management information and metadata information about which segments should exist in the cluster . if metadata storage Unavailable , It will only lead to coordinator node Unable to get this information , here coordinator node Can no longer perform its tasks , But in the cluster broker nodehistorical node and middle-manager node Still working and can be queried (queryable).

To make a long story short , If the external dependency responsible for coordination hangs , The cluster will remain as it is .

Storage format

druid Use columns to store data , It is also best used for aggregated event flows (aggregating event streams). The column store can only load and scan the required content when querying , Reduced load .druid There are three types of columns for , As mentioned earlier , And use different compression methods to reduce the cost of storage in memory and disk .

Like strings . It is not necessary to store strings directly ,druid Use dictionary compression to store strings . Map it to a unique integer identifier for each string , So you can use an array of integers and a map To represent the original string of characters . And the result integer array is very suitable for compression ,druid Use LZF Algorithm compression .

In the actual situation OLAP In workflow , Often the query is correct Satisfy a dimension Set specification Of metrics Aggregate result of the collection . also ,dimension It's often a string (string),metric It's often numerical .druid Additional reverse indexes are created for character string Columns , So that only rows related to a particular query filter are scanned . This use of bitmaps , perform boolean operation , In search engines, we often see .druid Use Concise Compression algorithm compresses bitmap .

Segment storage format

Druid The stored data format is a column table , The types of columns can be divided into three categories :

  1. timestamp: At present, there is only one table , yes OLAP The basis of .
  2. dimension: Store directly accessible data , character string 、 plastic 、 floating-point , You can filter aggregations .
  3. metric: Store the second acquired value , You can aggregate .

about timestamp and metric Come on , Storage is very simple , use LZ4( Customizable ) Packed integer or floating-point array storage . Once the query confirms which rows are required , These lines will be unzipped , Extract related lines , Then apply the required aggregation operation to obtain the result .

For strings dimension Column is different , Because strings dimension Support filtering and aggregation operations . It stores three data structures ( The third kind of bitmap You can customize whether you want to ):

  1. Map values to integers id Dictionary
  2. List encoded using the dictionary in step 1
  3. For each different value in the column , A bitmap that identifies which rows contain the value

Now consider druid Official website Examples given :

Data example

For such a small table Page This string dimension Column , stay druid There are three data structures in :

1:  Map values to integers id Dictionary 
{
	"Justin Bieber": 0,
	"Ke$ha"        : 1
}

2.  List encoded using the dictionary in step 1 
[
 0,
 0,
 1,
 1
]

3.  For each different value in the column , A bitmap that identifies which rows contain the value 
value="Justin Bieber": [1,1,0,0]
value="Ke$ha":         [0,0,1,1]

So why design these three data structures ? The dictionary maps strings to integers , So that the string can be in 2、3 Medium compact means , At the same time, it avoids a lot of storage for repeated strings . and 3 Medium bitmap( This is used as an inverted index ) You can perform fast filtering operations ( such as AND、OR). When filtering ,druid Just consider the target column bitmap Not 0 Just line ; stay groupby when , Just put the non 0 Row fetching for other operations .

Be careful ,bitmap It consumes more resources , The bitmap size is the number of data rows * Column cardinality , Although this is a very sparse , Highly compressible bitmap , You can consider repeating strings very rarely 、 Strings that are not commonly used for aggregate filtering operations diemension disable .

performance

To configure

Our cluster configuration is as follows :

12 individual historical node, Every 8 nucleus 16G, Have 2T cache .

20 individual middlemanager node, Every 6 nucleus 16G, Have 200G cache .

data ingestion

ingestion Performance is not easy to measure , Every task Of peon The virtual machine configuration is different , Every task Duration 、 The number of data sources varies , Each core has a different frequency , Memory speed Different , Even the fields of the data table 、 Whether there is any special treatment is different , So it's hard to measure .

Because it is far from druid Data ingestion limit , At present, the maximum in our cluster is 60'0000 Data /minute/core.

druid Data given by the development team : 10-100K events/second/core

Data query

I designed a group sql Query experiment , be based on 80 Billion level data query . The results are presented below :

test1

select model, count(*) as cnt from "tencent-json_kv_3"
where app_id='620' group by model order by cnt desc

test2

select model, os, count(*) as cnt from "tencent-json_kv_3"
where app_id='620' group by model, os order by cnt desc

test3

select SUBSTRING(app_id, 1, 2) AS "app_id_substring", model, os, count(*) as cnt from "tencent-json_kv_3"
where app_id='620' group by 1, model, os order by cnt desc

test4

select count(DISTINCT os) as os, TIME_CEIL(__time, 'PT00H05M') from "tencent-json_kv_3"
where app_id='620'
group by 2

test5

select TIME_CEIL(__time, 'PT00H05M'), MAX(op) from "tencent-json_kv_3"
where app_id='620' AND op <= 9000 AND page_id IS NOT NULL
group by 1
order by 2

summary

druid The use of is cumbersome , There are a lot of configurations that need to be tested and compared constantly , Different types of 、 There are different recommended parameters in different situations , Comparison of eating experience . in addition ,druid Query cache hits have a great impact on query performance , The query performance can reach seconds or even milliseconds in the case of hit .

For one OLAP Come on ,druid It has achieved the required functions , Even beyond their own set goals . But the configuration is too cumbersome , Many parameter option configurations are actually repeated , It is entirely possible to deduce the remaining part with some parameters .

because druid Still growing , Subsequent improvements may change with each passing day . hope druid Can be apache The other side of the signboard .

So to conclude druid Application of :

Druid Apply to

  1. Data is often inserted and rarely updated 、 Delete
  2. Queries are generally aggregate queries and non group queries (Group By), Partial search and scan queries
  3. The data query delay is required to be in 100 Milliseconds to seconds
  4. The data has a time field
  5. Multi table scenario , Only one large distributed table is hit per query , And it will hit many small loopup surface
  6. The scene contains high cardinality data columns( It's not easy to express here , In the original . for example :URL, user ID), And it needs to be quickly counted and sorted
  7. Need from Kafka、HDFS Load data in the object store

Druid Do not apply to

  1. Update the data with low delay according to the primary key
  2. Latency is not a critical offline data system at all
  3. Scenarios include large connections , And can tolerate a lot of time spending
原网站

版权声明
本文为[charmer]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/04/20210416184427669d.html