当前位置:网站首页>kotlin 协程通道
kotlin 协程通道
2022-06-24 12:58:00 【day_moon】
// 1.通道 传输值流的方法
fun main_channel() = runBlocking {
val channel = Channel<Int>()//声明一个channel
launch {
for (i in 1..3) {
channel.send(i + i)
}//发送1+1 2+2 3+3
channel.close()//没有立即关闭通道
}
repeat(3) { // 这样写也是可以for (y in channel) println(y)
println("接收到的是${channel.receive()}")
}
println("end ..")
}
//2.构建通道生产者
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {//构建通道生产者
for (i in 1..3) send(i + i)
}
fun main_produce() = runBlocking {
val produce = producer()
produce.consumeEach { println("$it") }//迭代每一个生产者
println("end ..")
}
//3 管道
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {//每次加1 生成无穷个 没有返回类型
var x = 1
while (true) send(x++)
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {//先收到的,然后处理后再发出
for (x in numbers) send(x + x)
}
fun main_pip() = runBlocking {
val numbers = produceNumbers() //每次加1 生成无穷个 没有返回类型
val squares = square(numbers) //先收到的,然后处理后再发出
repeat(3) {//重复3次
println(squares.receive())
}
println("end ..") //完成
coroutineContext.cancelChildren() // 取消子协程
}
//4.管道的素数
fun main_filters() = runBlocking {
var cur = numberssend(2)//从二开始 2 3 4..
repeat(10) {//取10个数
val number = cur.receive()
println(" $number")
cur = filters(cur, number)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numberssend(start: Int) = produce<Int> {//数据每次加1发送
var x = start
while (true) send(x++)
}
//从2开始一个数字流,从当前通道获取一个质数,并为找到的每个质数启动新的管道
fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)//可以被发送的数整除 然后发送
}
//5.扇出
fun mainFanout() = runBlocking<Unit> {
val producer = produceNumber()
repeat(5) {
launchProcessor(it, producer)
}
delay(950)
producer.cancel() //取消
}
fun CoroutineScope.produceNumber() = produce<Int> {
var x = 1
while (true) {
send(x++) //从1开始和每次加1
delay(100) //
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
//6.扇入
fun main_in() = runBlocking {
val channel = Channel<String>()
//两个协程来发送字符串
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
//7.带缓冲的通道
fun main_Buffered() = runBlocking<Unit> {
val channel = Channel<Int>(4) //容量为4
val sender = launch { // 启动协程
repeat(10) {
println("发送的数据 $it")//0-4
channel.send(it)
}
}
delay(100)//延迟1秒
sender.cancel() //取消
}
//8.通道是公平的
data class Ball(var hits: Int)
fun main_fair() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
//9.计时通道
fun main_Ticker() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}边栏推荐
- I have fundamentally solved the problem of wechat occupying mobile memory
- Seven challenges faced by data scientists and Solutions
- Docker安装redis
- RAID5 array recovery case tutorial of a company in Shanghai
- How can the new webmaster avoid the ups and downs caused by SEO optimization?
- C语言中常量的定义和使用
- 如何避免严重网络安全事故的发生?
- Kotlin initialization block
- [one picture series] one picture to understand Tencent Qianfan ipaas
- Cloud native essay solicitation progress case practice
猜你喜欢

CVPR 2022 | 美团技术团队精选论文解读

Developer survey: rust/postgresql is the most popular, and PHP salary is low

Vulnerability management mistakes that CIOs still make

Golden age ticket: Web3.0 Security Manual

3. caller service call - dapr

**Unity中莫名其妙得小问题-灯光和天空盒

Opengauss kernel: simple query execution

Quickly understand the commonly used message summarization algorithms, and no longer have to worry about the thorough inquiry of the interviewer

Comparator sort functional interface

黄金年代入场券之《Web3.0安全手册》
随机推荐
【sdx62】WCN685X IPA注册失败问题分析及解决方案
Goldfish rhca memoirs: do447 manage lists and credentials -- create machine credentials for the access list host
RAID5 array recovery case tutorial of a company in Shanghai
39 - read XML node and attribute values
I have fundamentally solved the problem of wechat occupying mobile memory
Cloud native essay solicitation progress case practice
kotlin 异步流
Sphere, openai and ai21 jointly publish the best practice guidelines for deployment models
AGCO AI frontier promotion (6.24)
Process basic properties
Express 100 Express query interface (API) interface specification document - detailed version
SYSTEMd common component description
Vipshop's "special sale" business is no longer easy to do?
Beauty of script │ VBS introduction interactive practice
Kotlin coroutine context and scheduler
How does webrtc obtain video stream data on the C ++ side?
Kotlin asynchronous flow
如何避免严重网络安全事故的发生?
开发者调查:Rust/PostgreSQL 最受喜爱,PHP 薪水偏低
kotlin 关键字 扩展函数