当前位置:网站首页>kotlin 异步流
kotlin 异步流
2022-06-24 12:54:00 【day_moon】
//1.表示多个值 流用于返回多个异步计算值
fun foo(): List<Int> = listOf(1, 2, 3)
fun forEachList() {
foo().forEach { value -> println(value) }
}
//2.序列
fun foos(): Sequence<Int> = sequence {//使用一个序列(Sequence)来表示数字
for (i in 1..3) {
Thread.sleep(1000)//等待100毫秒
yield(i)//下一个值
}
}
fun forEachSequences() {
foos().forEach { value -> println(value) }
}
//3 挂起函数
suspend fun foo_Suspending(): List<Int> {//suspend修饰不会阻塞主线程 ,List<Int>我们只能同时返回所有值
delay(1000)
return listOf(1, 2, 3)
}
fun main_Suspending() = runBlocking {
foo_Suspending().forEach { value -> println(value) }
}
//4.Flows
fun foo_flows(): Flow<Int> = flow {//构造器函数名为 flow 不再标记 suspend 修饰符
for (i in 1..3) { //flow{...} 中的代码块可以挂起
delay(2000)
emit(i)//值通过 emit 函数从流中发出
}
}
fun main_flows() = runBlocking<Unit> {
launch {//用于检查主线程是否阻塞
for (k in 1..3) {
println("k $k")
delay(1000)//等待1000毫秒 不会阻塞主线程
}
}
foo_flows().collect { value -> println("$value") }// collect 函数从 flow 中取值
}
//5.流是冷的
fun foo_cold(): Flow<Int> = flow {
for (i in 1..3) {//flow 每次收集时都会启动
println("Flow开启")
delay(1000)
emit(i)
}
}
fun main_cold() = runBlocking {
val flows = foo_cold()
println("...$flows")
flows.collect { value -> println("$value") }//先开启,再打印值
println("...收集")
}
//6.取消流
fun foo_cancel(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun main_cancel() = runBlocking {
withTimeoutOrNull(1200) {//运行一个就取消了
foo_cancel().collect { value -> println("$value") }
}
println("end")
}
//7.流构建器 asFlow
fun main_asFlow() = runBlocking {
(1..3).asFlow().collect { value -> println("$value") }
}
//8.中间流运算符
suspend fun per_request(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_map() = runBlocking {
(1..3).asFlow()//构建流
.map { request -> per_request(request) }//中间运算符
.collect { value -> println("$value") }
}
//9.转换操作符
suspend fun per_transform(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_transform() = runBlocking {
(1..3).asFlow()//构建流
.transform { request ->
emit("request $request")//异步请求之前发出一个字符串和跟随一个响应
emit(per_transform(request))
}//中间运算符
.collect { value -> println("$value") }
}
//10 限长度运算符
fun number(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("end")//运行到这里关闭
emit(3)
} finally {
println("finally")
}
}
fun main_take() = runBlocking {
number().take(2).collect { value -> println("$value") }//take 限制长度
}
//11.流运算符
fun main_reduce()= runBlocking {
val sun= (1..3).asFlow()//构造流
.map { it*it }
.reduce { a, b ->a + b }//转结果为相加 然后返回
println("$sun")
}
//12 流是连续的 像流水线那样
fun main_flowd() = runBlocking {
(1..10).asFlow().filter { println("请求值 $it")
it % 2 != 0 }
.map {
println("返回值是 $it")
}.collect { value -> println("$value") }
}
//13 流上下文保留 也就是经过后 会有值出来 是保存的
fun main_save()= runBlocking {
(1..5).asFlow().filter {it%2==0}.map {
println("返回值是 $it")
}.collect { value -> println("最后返回 $value") }
}
//14 错误地使用 withContext
fun flow_withContext():Flow<Int> = flow {
withContext(Dispatchers.Default){//这行报错Flow invariant is violated
for (i in 1..3){
emit(i)
println("$i")
}
}
}
fun main_withContext()= runBlocking {
flow_withContext().collect { value -> println("$value") }
}
//15 flowOn 运算符 不能改变流的上下文
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun flow_flowOn():Flow<Int> = flow {
for (i in 1..3){
log("flow_flowOn $i")
emit(i)
}
}.flowOn(Dispatchers.IO)//在子线程执行
fun main_flowOn()= runBlocking {
log("主")//主线程执行
flow_flowOn().collect { value -> log("main_flowOn $value") }//主线程执行
}
//16 缓冲 减少收集时间
fun flow_buffer():Flow<Int> = flow {
for (i in 1..3){
delay(2000)//一共6000
emit(i)
}
}
fun main_buffer() = runBlocking {
val time= measureTimeMillis {
flow_buffer().buffer(1).collect { value ->
delay(300)//这一步收集 只用了300多
println("$value") }
}
println("所需要的时间 $time")//6326
}
//17合并 conflate
fun flows_conflate():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_conflate() = runBlocking {
flows_conflate().conflate().//第二个数字被合并(丢弃)
collect { value -> println("$value") }
}
//18 处理最新 collectLatest
fun flow_collectLatest():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_collectLatest()= runBlocking {
flow_collectLatest().collectLatest { value ->
delay(300)//延迟3秒
println("$value") }//只打印最后一个
}
//19 组合流 zip combine 组合流的作用
val one=(1..2).asFlow()
val two= flowOf("4","5","6")
fun main_zip() = runBlocking {
// one.zip(two){a,b->"$a->$b"}.collect { value -> println("$value") }//不会打印6
one.combine(two){a,b->"$a->$b"}.collect { value -> println("$value") }//2重复 然后跟6一起打印出来
}
//20 展平流
// flatMapConcat 等待内部流完成,然后开始收集下一个流
//flatMapMerge 同时收集所有传入流并将其值合并到单个流中,以便尽快发出值
//flatMapLatest 处理最新值
fun flow_Flattening(i: Int):Flow<String> = flow {
emit("old $i")
delay(1000)
emit("new $i")
}
fun main_Flattening() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }.flatMapLatest { flow_Flattening(it) }.collect { value ->
println("$value 收集时间 ${System.currentTimeMillis() - startTime} ms ")
}
}
//21 捕获异常流
//sampleStart
fun flow_catch(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { println("$value") }//制造异常
"string $value"
}
fun main_catch() = runBlocking<Unit> {
//1.用try catch
// try {
// flow_catch().collect { value -> println(value) }
// } catch (e: Throwable) {
// println("Caught $e")//有异常就捕获
// }
//2.catch
// flow_catch().catch { e-> emit("Caught $e") }.collect { value -> println("$value") }//用catch替代
//3.异常放在collect 也就是下游 不起作用
flow_catch().catch { e-> emit("Caught $e") }.collect { value ->
// check(value <= "1") { "Collected $value" }//
println(value)
}
//4.声明式捕获异常
flow_catch().onEach { value ->
check(value <= "1") { "检查 $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}//22.流完成 除了finally外,还有 onCompletion
fun flows_oncompletion():Flow<Int> = flow {
emit(1)//发出数字1后引发异常
throw RuntimeException()
}
fun main_onCompletion() = runBlocking {
flows_oncompletion().onCompletion {//onCompletion有个 Throwable 参数可用于确定流收集是正常完成还是异常完成
cause ->if (cause!=null) println("有异常出现") }
.catch { //onCompletion 运算符不处理异常 异常仍然会流向下游
cause -> println("异常是 $cause") }.collect { value -> println("$value") }
}
//23 启动流
fun main_even() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") //将一段代码注册为对传入事件的响应
}.collect( )//等待收集流完成
println("end ..")//等收集完成 才运行这个
}
//24 启动流
fun main_launchIn() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") //将一段代码注册为对传入事件的响应
}.launchIn( this)//等待收集流完成
println("end ..")//先运行这个
}边栏推荐
- Understanding openstack network
- Preparation and operation & Maintenance Guide for 'high concurrency & high performance & high availability service program'
- Sinomeni vine was selected as the "typical solution for digital technology integration and innovative application in 2021" of the network security center of the Ministry of industry and information te
- How can junior middle school developers effectively reduce their own workload?
- #yyds干货盘点# 解决剑指offer:调整数组顺序使奇数位于偶数前面(二)
- Source code analysis handler interview classic
- How long will it take to open a mobile account? Is online account opening safe?
- Vipshop's "special sale" business is no longer easy to do?
- 图扑软件数字孪生海上风电 | 向海图强,奋楫争先
- Golden age ticket: Web3.0 Security Manual
猜你喜欢

CVPR 2022 | 美团技术团队精选论文解读

Quickly understand the commonly used message summarization algorithms, and no longer have to worry about the thorough inquiry of the interviewer

系统测试主要步骤

常识知识点

面试官:MySQL 数据库查询慢,除了索引问题还可能是什么原因?

不用Home Assistant,智汀也开源接入HomeKit、绿米设备?

谁是鱼谁是饵?红队视角下蜜罐识别方式汇总

The data value reported by DTU cannot be filled into Tencent cloud database through Tencent cloud rule engine

CVPR 2022 | interprétation de certains documents de l'équipe technique de meituan

Comparator 排序函数式接口
随机推荐
工业物联网(IIoT)的八个主要趋势
CVPR 2022 - Interpretation of selected papers of meituan technical team
使用 Abp.Zero 搭建第三方登录模块(一):原理篇
10 reduce common "tricks"
Gateway processing flow of zuul source code analysis
Ask a question about SQL view
【AI玩家养成记】用AI识别邻居家旺财是什么品种
8 lines of code to teach you how to build an intelligent robot platform
ERR AUTH&lt; password&gt; called without anypassword configured for the default user. Ar
【5G NR】5G NR系统架构
Why is open source technology so popular in the development of audio and video streaming media platform?
华为AppLinking中统一链接的创建和使用
华为 PC 逆势增长,产品力决定一切
Huawei PC grows against the trend, and product power determines everything
Parti,谷歌的自回归文生图模型
How to create a new empty branch in the web development process of easyrtc?
手机开户后多久才能通过?在线开户安全么?
SAP QM qac1 transaction code cannot modify the quantity in the inspection lot containing Hu
天猫618农产品“百强县” 35个县域来自中西部及东北
Cmput 379 explanation