当前位置:网站首页>kotlin 协程通道
kotlin 协程通道
2022-06-24 12:58:00 【day_moon】
// 1.通道 传输值流的方法
fun main_channel() = runBlocking {
val channel = Channel<Int>()//声明一个channel
launch {
for (i in 1..3) {
channel.send(i + i)
}//发送1+1 2+2 3+3
channel.close()//没有立即关闭通道
}
repeat(3) { // 这样写也是可以for (y in channel) println(y)
println("接收到的是${channel.receive()}")
}
println("end ..")
}
//2.构建通道生产者
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {//构建通道生产者
for (i in 1..3) send(i + i)
}
fun main_produce() = runBlocking {
val produce = producer()
produce.consumeEach { println("$it") }//迭代每一个生产者
println("end ..")
}
//3 管道
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {//每次加1 生成无穷个 没有返回类型
var x = 1
while (true) send(x++)
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {//先收到的,然后处理后再发出
for (x in numbers) send(x + x)
}
fun main_pip() = runBlocking {
val numbers = produceNumbers() //每次加1 生成无穷个 没有返回类型
val squares = square(numbers) //先收到的,然后处理后再发出
repeat(3) {//重复3次
println(squares.receive())
}
println("end ..") //完成
coroutineContext.cancelChildren() // 取消子协程
}
//4.管道的素数
fun main_filters() = runBlocking {
var cur = numberssend(2)//从二开始 2 3 4..
repeat(10) {//取10个数
val number = cur.receive()
println(" $number")
cur = filters(cur, number)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numberssend(start: Int) = produce<Int> {//数据每次加1发送
var x = start
while (true) send(x++)
}
//从2开始一个数字流,从当前通道获取一个质数,并为找到的每个质数启动新的管道
fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)//可以被发送的数整除 然后发送
}
//5.扇出
fun mainFanout() = runBlocking<Unit> {
val producer = produceNumber()
repeat(5) {
launchProcessor(it, producer)
}
delay(950)
producer.cancel() //取消
}
fun CoroutineScope.produceNumber() = produce<Int> {
var x = 1
while (true) {
send(x++) //从1开始和每次加1
delay(100) //
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
//6.扇入
fun main_in() = runBlocking {
val channel = Channel<String>()
//两个协程来发送字符串
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
//7.带缓冲的通道
fun main_Buffered() = runBlocking<Unit> {
val channel = Channel<Int>(4) //容量为4
val sender = launch { // 启动协程
repeat(10) {
println("发送的数据 $it")//0-4
channel.send(it)
}
}
delay(100)//延迟1秒
sender.cancel() //取消
}
//8.通道是公平的
data class Ball(var hits: Int)
fun main_fair() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
//9.计时通道
fun main_Ticker() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}边栏推荐
- Hardware development notes (6): basic process of hardware development, making a USB to RS232 module (5): creating USB package library and associating principle graphic devices
- 服务可见可观测性
- 硬件开发笔记(六): 硬件开发基本流程,制作一个USB转RS232的模块(五):创建USB封装库并关联原理图元器件
- Without home assistant, zhiting can also open source access homekit and green rice devices?
- Goldfish rhca memoirs: do447 managing projects and conducting operations -- creating a project for ansible scripts
- 2022煤矿瓦斯抽采操作证考试题及模拟考试
- Android kotlin Encyclopedia
- Activity生命周期
- Source code analysis handler interview classic
- [log service CLS] Tencent cloud log service CLS accesses CDN
猜你喜欢

Getting started with the go Cobra command line tool

Hands on data analysis unit 3 model building and evaluation

Sinomeni vine was selected as the "typical solution for digital technology integration and innovative application in 2021" of the network security center of the Ministry of industry and information te

Vulnerability management mistakes that CIOs still make

Opengauss kernel: simple query execution

**Unity中莫名其妙得小问题-灯光和天空盒

Hardware development notes (6): basic process of hardware development, making a USB to RS232 module (5): creating USB package library and associating principle graphic devices

Eight major trends in the industrial Internet of things (iiot)

吉时利静电计宽测量范围

3. caller service call - dapr
随机推荐
‘高并发&高性能&高可用服务程序’编写及运维指南
Getting started with the go Cobra command line tool
These default routes and static routes can not be configured and deployed. What kind of network workers are they!
kotlin 继承、类、重载
Golden age ticket: Web3.0 Security Manual
Activity lifecycle
Sphere, openai and ai21 jointly publish the best practice guidelines for deployment models
数据科学家面临的七大挑战及解决方法
SYSTEMd common component description
RAID5 array recovery case tutorial of a company in Shanghai
图扑软件数字孪生海上风电 | 向海图强,奋楫争先
One hour is worth seven days! Ingenuity in the work of programmers
工业物联网(IIoT)的八个主要趋势
kotlin 数组、集合和 Map 的使用
C语言中常量的定义和使用
8 lines of code to teach you how to build an intelligent robot platform
How can the new webmaster avoid the ups and downs caused by SEO optimization?
How to avoid serious network security accidents?
[AI player cultivation record] use AI to identify what kind of wealth is next door
Sinomeni vine was selected as the "typical solution for digital technology integration and innovative application in 2021" of the network security center of the Ministry of industry and information te