当前位置:网站首页>kotlin 异步流
kotlin 异步流
2022-06-24 12:54:00 【day_moon】
//1.表示多个值 流用于返回多个异步计算值
fun foo(): List<Int> = listOf(1, 2, 3)
fun forEachList() {
foo().forEach { value -> println(value) }
}
//2.序列
fun foos(): Sequence<Int> = sequence {//使用一个序列(Sequence)来表示数字
for (i in 1..3) {
Thread.sleep(1000)//等待100毫秒
yield(i)//下一个值
}
}
fun forEachSequences() {
foos().forEach { value -> println(value) }
}
//3 挂起函数
suspend fun foo_Suspending(): List<Int> {//suspend修饰不会阻塞主线程 ,List<Int>我们只能同时返回所有值
delay(1000)
return listOf(1, 2, 3)
}
fun main_Suspending() = runBlocking {
foo_Suspending().forEach { value -> println(value) }
}
//4.Flows
fun foo_flows(): Flow<Int> = flow {//构造器函数名为 flow 不再标记 suspend 修饰符
for (i in 1..3) { //flow{...} 中的代码块可以挂起
delay(2000)
emit(i)//值通过 emit 函数从流中发出
}
}
fun main_flows() = runBlocking<Unit> {
launch {//用于检查主线程是否阻塞
for (k in 1..3) {
println("k $k")
delay(1000)//等待1000毫秒 不会阻塞主线程
}
}
foo_flows().collect { value -> println("$value") }// collect 函数从 flow 中取值
}
//5.流是冷的
fun foo_cold(): Flow<Int> = flow {
for (i in 1..3) {//flow 每次收集时都会启动
println("Flow开启")
delay(1000)
emit(i)
}
}
fun main_cold() = runBlocking {
val flows = foo_cold()
println("...$flows")
flows.collect { value -> println("$value") }//先开启,再打印值
println("...收集")
}
//6.取消流
fun foo_cancel(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun main_cancel() = runBlocking {
withTimeoutOrNull(1200) {//运行一个就取消了
foo_cancel().collect { value -> println("$value") }
}
println("end")
}
//7.流构建器 asFlow
fun main_asFlow() = runBlocking {
(1..3).asFlow().collect { value -> println("$value") }
}
//8.中间流运算符
suspend fun per_request(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_map() = runBlocking {
(1..3).asFlow()//构建流
.map { request -> per_request(request) }//中间运算符
.collect { value -> println("$value") }
}
//9.转换操作符
suspend fun per_transform(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_transform() = runBlocking {
(1..3).asFlow()//构建流
.transform { request ->
emit("request $request")//异步请求之前发出一个字符串和跟随一个响应
emit(per_transform(request))
}//中间运算符
.collect { value -> println("$value") }
}
//10 限长度运算符
fun number(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("end")//运行到这里关闭
emit(3)
} finally {
println("finally")
}
}
fun main_take() = runBlocking {
number().take(2).collect { value -> println("$value") }//take 限制长度
}
//11.流运算符
fun main_reduce()= runBlocking {
val sun= (1..3).asFlow()//构造流
.map { it*it }
.reduce { a, b ->a + b }//转结果为相加 然后返回
println("$sun")
}
//12 流是连续的 像流水线那样
fun main_flowd() = runBlocking {
(1..10).asFlow().filter { println("请求值 $it")
it % 2 != 0 }
.map {
println("返回值是 $it")
}.collect { value -> println("$value") }
}
//13 流上下文保留 也就是经过后 会有值出来 是保存的
fun main_save()= runBlocking {
(1..5).asFlow().filter {it%2==0}.map {
println("返回值是 $it")
}.collect { value -> println("最后返回 $value") }
}
//14 错误地使用 withContext
fun flow_withContext():Flow<Int> = flow {
withContext(Dispatchers.Default){//这行报错Flow invariant is violated
for (i in 1..3){
emit(i)
println("$i")
}
}
}
fun main_withContext()= runBlocking {
flow_withContext().collect { value -> println("$value") }
}
//15 flowOn 运算符 不能改变流的上下文
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun flow_flowOn():Flow<Int> = flow {
for (i in 1..3){
log("flow_flowOn $i")
emit(i)
}
}.flowOn(Dispatchers.IO)//在子线程执行
fun main_flowOn()= runBlocking {
log("主")//主线程执行
flow_flowOn().collect { value -> log("main_flowOn $value") }//主线程执行
}
//16 缓冲 减少收集时间
fun flow_buffer():Flow<Int> = flow {
for (i in 1..3){
delay(2000)//一共6000
emit(i)
}
}
fun main_buffer() = runBlocking {
val time= measureTimeMillis {
flow_buffer().buffer(1).collect { value ->
delay(300)//这一步收集 只用了300多
println("$value") }
}
println("所需要的时间 $time")//6326
}
//17合并 conflate
fun flows_conflate():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_conflate() = runBlocking {
flows_conflate().conflate().//第二个数字被合并(丢弃)
collect { value -> println("$value") }
}
//18 处理最新 collectLatest
fun flow_collectLatest():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_collectLatest()= runBlocking {
flow_collectLatest().collectLatest { value ->
delay(300)//延迟3秒
println("$value") }//只打印最后一个
}
//19 组合流 zip combine 组合流的作用
val one=(1..2).asFlow()
val two= flowOf("4","5","6")
fun main_zip() = runBlocking {
// one.zip(two){a,b->"$a->$b"}.collect { value -> println("$value") }//不会打印6
one.combine(two){a,b->"$a->$b"}.collect { value -> println("$value") }//2重复 然后跟6一起打印出来
}
//20 展平流
// flatMapConcat 等待内部流完成,然后开始收集下一个流
//flatMapMerge 同时收集所有传入流并将其值合并到单个流中,以便尽快发出值
//flatMapLatest 处理最新值
fun flow_Flattening(i: Int):Flow<String> = flow {
emit("old $i")
delay(1000)
emit("new $i")
}
fun main_Flattening() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }.flatMapLatest { flow_Flattening(it) }.collect { value ->
println("$value 收集时间 ${System.currentTimeMillis() - startTime} ms ")
}
}
//21 捕获异常流
//sampleStart
fun flow_catch(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { println("$value") }//制造异常
"string $value"
}
fun main_catch() = runBlocking<Unit> {
//1.用try catch
// try {
// flow_catch().collect { value -> println(value) }
// } catch (e: Throwable) {
// println("Caught $e")//有异常就捕获
// }
//2.catch
// flow_catch().catch { e-> emit("Caught $e") }.collect { value -> println("$value") }//用catch替代
//3.异常放在collect 也就是下游 不起作用
flow_catch().catch { e-> emit("Caught $e") }.collect { value ->
// check(value <= "1") { "Collected $value" }//
println(value)
}
//4.声明式捕获异常
flow_catch().onEach { value ->
check(value <= "1") { "检查 $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}//22.流完成 除了finally外,还有 onCompletion
fun flows_oncompletion():Flow<Int> = flow {
emit(1)//发出数字1后引发异常
throw RuntimeException()
}
fun main_onCompletion() = runBlocking {
flows_oncompletion().onCompletion {//onCompletion有个 Throwable 参数可用于确定流收集是正常完成还是异常完成
cause ->if (cause!=null) println("有异常出现") }
.catch { //onCompletion 运算符不处理异常 异常仍然会流向下游
cause -> println("异常是 $cause") }.collect { value -> println("$value") }
}
//23 启动流
fun main_even() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") //将一段代码注册为对传入事件的响应
}.collect( )//等待收集流完成
println("end ..")//等收集完成 才运行这个
}
//24 启动流
fun main_launchIn() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") //将一段代码注册为对传入事件的响应
}.launchIn( this)//等待收集流完成
println("end ..")//先运行这个
}边栏推荐
- Who is the fish and who is the bait? Summary of honeypot recognition methods from the perspective of red team
- C语言中常量的定义和使用
- 华为 PC 逆势增长,产品力决定一切
- Kotlin initialization block
- Sinomeni vine was selected as the "typical solution for digital technology integration and innovative application in 2021" of the network security center of the Ministry of industry and information te
- Cloud native essay solicitation progress case practice
- 【AI玩家养成记】用AI识别邻居家旺财是什么品种
- CVPR 2022 | 美團技術團隊精選論文解讀
- CVPR 2022 | 美团技术团队精选论文解读
- 首席信息安全官仍然会犯的漏洞管理错误
猜你喜欢

Definition and use of constants in C language

CVPR 2022 - Interpretation of selected papers of meituan technical team

Hands on data analysis unit 3 model building and evaluation

3. caller service call - dapr

CVPR 2022 | 美團技術團隊精選論文解讀

Main steps of system test

Comparator sort functional interface

爱可可AI前沿推介(6.24)

Developer survey: rust/postgresql is the most popular, and PHP salary is low

Understanding openstack network
随机推荐
面试官:MySQL 数据库查询慢,除了索引问题还可能是什么原因?
Beauty of script │ VBS introduction interactive practice
这几个默认路由、静态路由的配置部署都不会,还算什么网工!
CVPR 2022 | interprétation de certains documents de l'équipe technique de meituan
Creation and use of unified links in Huawei applinking
发扬连续作战优良作风 全力以赴确保北江大堤安全
系统测试主要步骤
华为 PC 逆势增长,产品力决定一切
kotlin 继承、类、重载
Integrated API interface code of domestic express companies for intra city distribution and ordering - Express 100
《中国数据库安全能力市场洞察,2022》报告研究正式启动
Getting started with the lvgl Library - colors and images
MySQL interview questions
The agile way? Is agile development really out of date?
黄楚平主持召开定点联系珠海工作视频会议 坚决落实省委部署要求 确保防疫情、稳经济、保安全取得积极成效
首席信息安全官仍然会犯的漏洞管理错误
Kotlin anonymous function and lambda
华为AppLinking中统一链接的创建和使用
kotlin 语言特性
Teach you how to use airtestide to connect your mobile phone wirelessly!