当前位置:网站首页>kotlin 协程通道
kotlin 协程通道
2022-06-24 12:58:00 【day_moon】
// 1.通道 传输值流的方法 fun main_channel() = runBlocking { val channel = Channel<Int>()//声明一个channel launch { for (i in 1..3) { channel.send(i + i) }//发送1+1 2+2 3+3 channel.close()//没有立即关闭通道 } repeat(3) { // 这样写也是可以for (y in channel) println(y) println("接收到的是${channel.receive()}") } println("end ..") } //2.构建通道生产者 fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {//构建通道生产者 for (i in 1..3) send(i + i) } fun main_produce() = runBlocking { val produce = producer() produce.consumeEach { println("$it") }//迭代每一个生产者 println("end ..") } //3 管道 fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {//每次加1 生成无穷个 没有返回类型 var x = 1 while (true) send(x++) } fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {//先收到的,然后处理后再发出 for (x in numbers) send(x + x) } fun main_pip() = runBlocking { val numbers = produceNumbers() //每次加1 生成无穷个 没有返回类型 val squares = square(numbers) //先收到的,然后处理后再发出 repeat(3) {//重复3次 println(squares.receive()) } println("end ..") //完成 coroutineContext.cancelChildren() // 取消子协程 } //4.管道的素数 fun main_filters() = runBlocking { var cur = numberssend(2)//从二开始 2 3 4.. repeat(10) {//取10个数 val number = cur.receive() println(" $number") cur = filters(cur, number) } coroutineContext.cancelChildren() // cancel all children to let main finish } fun CoroutineScope.numberssend(start: Int) = produce<Int> {//数据每次加1发送 var x = start while (true) send(x++) } //从2开始一个数字流,从当前通道获取一个质数,并为找到的每个质数启动新的管道 fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x)//可以被发送的数整除 然后发送 } //5.扇出 fun mainFanout() = runBlocking<Unit> { val producer = produceNumber() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() //取消 } fun CoroutineScope.produceNumber() = produce<Int> { var x = 1 while (true) { send(x++) //从1开始和每次加1 delay(100) // } } fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } } //6.扇入 fun main_in() = runBlocking { val channel = Channel<String>() //两个协程来发送字符串 launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // receive first six println(channel.receive()) } coroutineContext.cancelChildren() // cancel all children to let main finish } suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } } //7.带缓冲的通道 fun main_Buffered() = runBlocking<Unit> { val channel = Channel<Int>(4) //容量为4 val sender = launch { // 启动协程 repeat(10) { println("发送的数据 $it")//0-4 channel.send(it) } } delay(100)//延迟1秒 sender.cancel() //取消 } //8.通道是公平的 data class Ball(var hits: Int) fun main_fair() = runBlocking { val table = Channel<Ball>() // a shared table launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them } suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ println("$name $ball") delay(300) // wait a bit table.send(ball) // send the ball back } } //9.计时通道 fun main_Ticker() = runBlocking<Unit> { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay println("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") // Emulate large consumption delays println("Consumer pauses for 150ms") delay(150) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // indicate that no more elements are needed }
边栏推荐
- 不用Home Assistant,智汀也开源接入HomeKit、绿米设备?
- The introduction of MySQL memory parameters is divided into two categories: thread exclusive and global sharing
- Comparator sort functional interface
- How can the new webmaster avoid the ups and downs caused by SEO optimization?
- Manuel d'entrevue du gestionnaire de l'analyse des sources
- 开发者调查:Rust/PostgreSQL 最受喜爱,PHP 薪水偏低
- Use of kotlin arrays, collections, and maps
- 源码解析 Handler 面试宝典
- 3. caller service call - dapr
- Yyds dry goods counting solution sword finger offer: adjust the array order so that odd numbers precede even numbers (2)
猜你喜欢
青藤入选工信部网安中心“2021年数字技术融合创新应用典型解决方案”
Beauty of script │ VBS introduction interactive practice
每日一题day8-515. 在每个树行中找最大值
Definition and use of constants in C language
10 reduce common "tricks"
万用表测量电阻图解及使用注意事项
华为 PC 逆势增长,产品力决定一切
群晖向阿里云OSS同步
Party, Google's autoregressive Wensheng graph model
3. Caller 服务调用 - dapr
随机推荐
一个团队可以既做项目又做产品吗?
Cloud native essay solicitation progress case practice
《中国数据库安全能力市场洞察,2022》报告研究正式启动
金鱼哥RHCA回忆录:DO447管理项目和开展作业--为ansible剧本创建一个项目
Redis面试题
工业物联网(IIoT)的八个主要趋势
位于相同的分布式端口组但不同主机上的虚拟机无法互相通信
Android kotlin Encyclopedia
The second phase of freshman engineering education seminar is to enroll in the China 100 school peer program
Eight major trends in the industrial Internet of things (iiot)
kotlin 语言特性
How can junior middle school developers effectively reduce their own workload?
kotlin 数组、集合和 Map 的使用
How to avoid serious network security accidents?
Goldfish rhca memoirs: do447 managing projects and conducting operations -- creating a project for ansible scripts
SYSTEMd common component description
[log service CLS] Tencent cloud log service CLS accesses CDN
Kotlin interface generic covariant inversion
Ask a question about SQL view
TCP triple handshake