当前位置:网站首页>Logstash——Logstash将数据推送至Redis
Logstash——Logstash将数据推送至Redis
2022-06-26 05:59:00 【大·风】
redis
这篇主要介绍使用redis作为output的目标
可配置参数
字段 | 参数类型 | 说明 |
---|---|---|
batch | boolean | 当为true的时候Redis批处理值并发送1个RPUSH命令而不是每个值发送一个命令以推送到列表,要求data_type=“list” |
batch_events | number | 当batch为true的时候,此参数限制排队的RPUSH事件数 |
batch_timeout | number | 当batch为true的时候,此参数限制超时时间 |
congestion_interval | number | 阻塞检查的时间间隔 |
congestion_threshold | number | 当data_type是list的时候,并且拥有超过congestion_threshold项,则被阻塞,直到有人消费其。假如没有消费者,Redis将会耗尽内存 |
data_type | string | 可选参数[“list”, “channel”],其实就是Redis队列类型 |
db | number | Redis数据库号 |
host | array | 主机名,如果主机列表是一个数组,Logstash将随机选择一个主机进行连接,如果该主机断开连接,则将选择另一个主机。 |
key | string | Redis列表或通道的名称 |
password | password | 用于验证的密码 |
port | number | 端口号 |
reconnect_interval | number | 重连的时间间隔 |
shuffle_hosts | boolean | |
timeout | number | 超时时间 |
队列类型选择
data_type来确定redis队列类型。可选参数为list和channel。两者的具体区别可以查看在使用redis做数据源文章里面的介绍(Logstash从Redis中收集数据并输出)[https://blog.csdn.net/qq330983778/article/details/105757604]
开启批处理
目前仅当data_type为list时可以使用批处理。和批处理相关的参数batch、batch_events、batch_timeout。
batch
是否开启批处理,默认关闭
batch_events
单次命令推送多少条数据,默认为50
batch_timeout
单次命令等待多久,默认为5s
默认情况下,每次执行一次rpush命令会发送一条数据。开启批处理后,每条命令发送的数据量取决于batch_events,发送时间取决于batch_timeout。具体看两个条件哪一个先满足。
logstash的保护机制
目前仅当data_type为list时可以使用
congestion_threshold
logstash提供了一个参数congestion_threshold
,主要是限制了redis中数据项中最多可以存在多少元素,当list中的元素大于此参数设置的值后,则会阻塞直到有其他消费者消费list中的数据。此操作主要是防止在没有消费者的时候,list数据过大导致Redis耗尽内存。此参数默认为0,表示禁用阻塞检测
congestion_interval
此参数用来设置阻塞检测的频率。默认为1s。当设置为0的时候,表示每执行一次rpush则进行一次检测。
数据推送至redis的配置
下面是使用channel或者list的配置
使用channel
input {
redis {
key => "logstash-redis"
host => "localhost"
password => "dailearn"
port => 6379
db => "0"
data_type => "list"
type => "redis"
codec => plain{
charset=>"UTF-8"
}
}
}
filter {
json {
source => "message"
}
}
output{
redis {
host => "localhost" #这个是标明redis服务的地址
port => 6379
codec => plain
db => 0 #redis中的数据库,select的对象
key => "logstash-message"#redis中的键值
data_type => channel #一般就是list和channel
password => "dailearn"
}
}
使用list做批处理
input {
redis {
key => "logstash-redis"
host => "localhost"
password => "dailearn"
port => 6379
db => "0"
data_type => "list"
type => "redis"
codec => plain{
charset=>"UTF-8"
}
}
}
filter {
json {
source => "message"
}
}
output{
redis {
batch => true
batch_events => 50
batch_timeout => 5
host => "localhost" #这个是标明redis服务的地址
port => 6379
codec => plain
db => 0 #redis中的数据库,select的对象
key => "logstash-message"#redis中的键值
data_type => list #一般就是list和channel
password => "dailearn"
}
}
阻塞检查的使用
下面是使用list做批处理做阻塞检查的配置
input {
redis {
key => "logstash-redis"
host => "localhost"
password => "dailearn"
port => 6379
db => "0"
data_type => "list"
type => "redis"
codec => plain{
charset=>"UTF-8"
}
}
}
filter {
json {
source => "message"
}
}
output{
redis {
batch => true
batch_events => 50
batch_timeout => 5
host => "localhost" #这个是标明redis服务的地址
port => 6379
codec => plain
congestion_threshold => 5
congestion_interval => 1
db => 0 #redis中的数据库,select的对象
key => "logstash-message"#redis中的键值
data_type => list #一般就是list和channel
password => "dailearn"
}
stdout {
codec => rubydebug {
}}
}
测试阻塞检查
上面的配置中阻塞数被设置为5(congestion_threshold => 5),那么现在向logstash中依次发送数据
set one test message。time:1590462390513
set one test message。time:1590462391833
set one test message。time:1590462393645
set one test message。time:1590462394727
set one test message。time:1590462396589
...
虽然配置中设置的5个数据就被阻塞,实际上redis被插入了六条数据。
当被插入5条数据的时候,控制台输出正常,但是当redis中数据达到六条的时候控制台会频繁的刷新异常信息,并且此时将无法再次插入数据。当然需要注意,多余的数据只是被阻塞并没被丢弃,在尝试消费掉redis的数据后,阻塞的数据还是会被插入到redis中
关于检查频率的设置
当把congestion_interval的值设置比较大的时候,可能会使得实际插入的数据多余限制的值。比如上面例子依次发送数据修改为一次性发送,就会使得redis中实际的数据远远大于需要的数据
批处理和检查频率的冲突
上面例子中因为在一个检查周期内多条数据被插入,而此时根据文档我们可以设置congestion_interval => 0
来实现每个事件都进行一次检查,但是实际上再次模拟上面场景依旧会发现redis中被插入了远超设置的数据。这是因为之前的配置中batch => true
,在批处理配置下,一个事件包含了多条数据,所以虽然这个时候每个事件进行一次检测,但是依旧会插入超过限制的数据量。
个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,假如开发同学发现了,请及时告知,我会第一时间修改相关内容。假如我的这篇内容对你有任何帮助的话,麻烦给我点一个赞。你的点赞就是我前进的动力。
边栏推荐
- Multi thread synchronous downloading of network pictures
- The use of loops in SQL syntax
- Household accounting procedures (First Edition)
- Yamaha robot splits visual strings
- 【C語言】深度剖析數據在內存中的存儲
- 05. basic data type - Dict
- Unicloud cloud development obtains applet user openid
- 家庭记账程序(第一版)
- E-commerce seeks growth breakthrough with the help of small program technology
- 怎么把平板作为电脑的第二扩展屏幕
猜你喜欢
Implementation of third-party wechat authorized login for applet
Bingc (inheritance)
kolla-ansible部署openstack yoga版本
kolla-ansible部署openstack yoga版本
Pytorch (network model training)
Mysql-10 (key)
Kolla ansible deploy openstack Yoga version
The purpose of writing programs is to solve problems
Household accounting procedures (First Edition)
小程序如何关联微信小程序二维码,实现二码聚合
随机推荐
NPM private server problem of peanut shell intranet penetration mapping
SQL Server view
Getting started with Python
Written before father's Day
A new explanation of tcp/ip five layer protocol model
Unicloud cloud development obtains applet user openid
工厂方法模式、抽象工厂模式
适配器模式
SQL Server视图
Consul service registration and discovery
Day4 branch and loop
MEF framework learning record
On site commissioning - final method of kb4474419 for win7 x64 installation and vs2017 flash back
Pytorch (network model)
Machine learning 07: Interpretation of PCA and its sklearn source code
Day2- syntax basis and variables
Project suspension
Household accounting procedures (First Edition)
状态模式,身随心变
Cyclic displacement