当前位置:网站首页>Kotlin asynchronous flow
Kotlin asynchronous flow
2022-06-24 13:39:00 【day_ moon】
//1. Indicates multiple values Stream is used to return multiple asynchronously computed values
fun foo(): List<Int> = listOf(1, 2, 3)
fun forEachList() {
foo().forEach { value -> println(value) }
}
//2. Sequence
fun foos(): Sequence<Int> = sequence {// Use a sequence (Sequence) To represent numbers
for (i in 1..3) {
Thread.sleep(1000)// wait for 100 millisecond
yield(i)// The next value is
}
}
fun forEachSequences() {
foos().forEach { value -> println(value) }
}
//3 Suspend function
suspend fun foo_Suspending(): List<Int> {//suspend Decoration does not block the main thread ,List<Int> We can only return all values at once
delay(1000)
return listOf(1, 2, 3)
}
fun main_Suspending() = runBlocking {
foo_Suspending().forEach { value -> println(value) }
}
//4.Flows
fun foo_flows(): Flow<Int> = flow {// The constructor function is named flow No longer mark suspend Modifier
for (i in 1..3) { //flow{...} Code blocks in can be suspended
delay(2000)
emit(i)// Value through emit Function is emitted from the stream
}
}
fun main_flows() = runBlocking<Unit> {
launch {// Used to check whether the main thread is blocked
for (k in 1..3) {
println("k $k")
delay(1000)// wait for 1000 millisecond Does not block the main thread
}
}
foo_flows().collect { value -> println("$value") }// collect Function from flow The value of
}
//5. The flow is cold
fun foo_cold(): Flow<Int> = flow {
for (i in 1..3) {//flow Each collection starts
println("Flow Turn on ")
delay(1000)
emit(i)
}
}
fun main_cold() = runBlocking {
val flows = foo_cold()
println("...$flows")
flows.collect { value -> println("$value") }// First open , Then print the value
println("... collect ")
}
//6. Cancel flow
fun foo_cancel(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun main_cancel() = runBlocking {
withTimeoutOrNull(1200) {// Running one will cancel
foo_cancel().collect { value -> println("$value") }
}
println("end")
}
//7. Stream builder asFlow
fun main_asFlow() = runBlocking {
(1..3).asFlow().collect { value -> println("$value") }
}
//8. Intermediate flow operator
suspend fun per_request(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_map() = runBlocking {
(1..3).asFlow()// Build flow
.map { request -> per_request(request) }// Intermediate operators
.collect { value -> println("$value") }
}
//9. Conversion operators
suspend fun per_transform(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_transform() = runBlocking {
(1..3).asFlow()// Build flow
.transform { request ->
emit("request $request")// An asynchronous request is preceded by a string and followed by a response
emit(per_transform(request))
}// Intermediate operators
.collect { value -> println("$value") }
}
//10 Length limiting operator
fun number(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("end")// Run here to close
emit(3)
} finally {
println("finally")
}
}
fun main_take() = runBlocking {
number().take(2).collect { value -> println("$value") }//take Limit length
}
//11. Stream operators
fun main_reduce()= runBlocking {
val sun= (1..3).asFlow()// Tectonic flow
.map { it*it }
.reduce { a, b ->a + b }// Turn the result into addition Then return
println("$sun")
}
//12 The flow is continuous Like an assembly line
fun main_flowd() = runBlocking {
(1..10).asFlow().filter { println(" Request value $it")
it % 2 != 0 }
.map {
println(" The return value is $it")
}.collect { value -> println("$value") }
}
//13 Stream context reservation That is, after There will be value It's preserved
fun main_save()= runBlocking {
(1..5).asFlow().filter {it%2==0}.map {
println(" The return value is $it")
}.collect { value -> println(" Finally back to $value") }
}
//14 Misused withContext
fun flow_withContext():Flow<Int> = flow {
withContext(Dispatchers.Default){// This line reports a mistake Flow invariant is violated
for (i in 1..3){
emit(i)
println("$i")
}
}
}
fun main_withContext()= runBlocking {
flow_withContext().collect { value -> println("$value") }
}
//15 flowOn Operator The context of the flow cannot be changed
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun flow_flowOn():Flow<Int> = flow {
for (i in 1..3){
log("flow_flowOn $i")
emit(i)
}
}.flowOn(Dispatchers.IO)// Execute in child thread
fun main_flowOn()= runBlocking {
log(" Lord ")// Main thread execution
flow_flowOn().collect { value -> log("main_flowOn $value") }// Main thread execution
}
//16 buffer Reduce collection time
fun flow_buffer():Flow<Int> = flow {
for (i in 1..3){
delay(2000)// altogether 6000
emit(i)
}
}
fun main_buffer() = runBlocking {
val time= measureTimeMillis {
flow_buffer().buffer(1).collect { value ->
delay(300)// This step collects It only took 300 many
println("$value") }
}
println(" The time required $time")//6326
}
//17 Merge conflate
fun flows_conflate():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_conflate() = runBlocking {
flows_conflate().conflate().// The second number is merged ( discarded )
collect { value -> println("$value") }
}
//18 Process latest collectLatest
fun flow_collectLatest():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_collectLatest()= runBlocking {
flow_collectLatest().collectLatest { value ->
delay(300)// Delay 3 second
println("$value") }// Print only the last
}
//19 Combined flow zip combine The role of combined flow
val one=(1..2).asFlow()
val two= flowOf("4","5","6")
fun main_zip() = runBlocking {
// one.zip(two){a,b->"$a->$b"}.collect { value -> println("$value") }// Don't print 6
one.combine(two){a,b->"$a->$b"}.collect { value -> println("$value") }//2 repeat Then follow 6 Printed together
}
//20 Advection flow
// flatMapConcat Wait for the internal flow to complete , Then start collecting the next stream
//flatMapMerge Collect all incoming streams at the same time and merge their values into a single stream , In order to send the value as soon as possible
//flatMapLatest Process the latest value
fun flow_Flattening(i: Int):Flow<String> = flow {
emit("old $i")
delay(1000)
emit("new $i")
}
fun main_Flattening() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }.flatMapLatest { flow_Flattening(it) }.collect { value ->
println("$value Collection time ${System.currentTimeMillis() - startTime} ms ")
}
}
//21 Catch exception flow
//sampleStart
fun flow_catch(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { println("$value") }// Make an exception
"string $value"
}
fun main_catch() = runBlocking<Unit> {
//1. use try catch
// try {
// flow_catch().collect { value -> println(value) }
// } catch (e: Throwable) {
// println("Caught $e")// Catch any exception
// }
//2.catch
// flow_catch().catch { e-> emit("Caught $e") }.collect { value -> println("$value") }// use catch replace
//3. Put the exception in collect That is, downstream It doesn't work
flow_catch().catch { e-> emit("Caught $e") }.collect { value ->
// check(value <= "1") { "Collected $value" }//
println(value)
}
//4. Catch exceptions declaratively
flow_catch().onEach { value ->
check(value <= "1") { " Check $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}//22. Flow complete except finally Outside , also onCompletion
fun flows_oncompletion():Flow<Int> = flow {
emit(1)// Send numbers 1 Exception thrown after
throw RuntimeException()
}
fun main_onCompletion() = runBlocking {
flows_oncompletion().onCompletion {//onCompletion There is one Throwable Parameters can be used to determine whether the stream collection is completed normally or abnormally
cause ->if (cause!=null) println(" There are exceptions ") }
.catch { //onCompletion Operator does not handle exceptions The anomaly will still flow downstream
cause -> println(" Exception is $cause") }.collect { value -> println("$value") }
}
//23 Start the flow
fun main_even() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") // Register a piece of code as a response to an incoming event
}.collect( )// Wait for the collection stream to complete
println("end ..")// When the collection is completed To run this
}
//24 Start the flow
fun main_launchIn() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") // Register a piece of code as a response to an incoming event
}.launchIn( this)// Wait for the collection stream to complete
println("end ..")// Run this first
}边栏推荐
- Eight major trends in the industrial Internet of things (iiot)
- 敏捷之道 | 敏捷开发真的过时了么?
- Coinbase will launch the first encryption derivative for individual investors
- 美国会参议院推进两党枪支安全法案
- 快速了解常用的消息摘要算法,再也不用担心面试官的刨根问底
- CPU process priority
- How to create a new empty branch in the web development process of easyrtc?
- The agile way? Is agile development really out of date?
- 3. Caller 服务调用 - dapr
- Memory introduction
猜你喜欢

常识知识点

这几个默认路由、静态路由的配置部署都不会,还算什么网工!

openGauss内核:简单查询的执行

黄金年代入场券之《Web3.0安全手册》

Vulnerability management mistakes that CIOs still make

国内首款开源MySQL HTAP数据库即将发布,三大看点提前告知

Main steps of system test

How to avoid serious network security accidents?

Internet of things? Come and see Arduino on the cloud

不用Home Assistant,智汀也开源接入HomeKit、绿米设备?
随机推荐
Getting started with the go Cobra command line tool
How stupid of me to hire a bunch of programmers who can only "Google"!
Richard Sutton, the father of reinforcement learning, paper: pursuing a general model for intelligent decision makers
【sdx62】WCN685X IPA不生效问题分析及解决方案
天猫618农产品“百强县” 35个县域来自中西部及东北
Redis scenario
图扑软件数字孪生海上风电 | 向海图强,奋楫争先
[one picture series] one picture to understand Tencent Qianfan ipaas
Why does the kubernetes environment require that bridge NF call iptables be enabled?
39 - read XML node and attribute values
Comparator sort functional interface
One article explains R & D efficiency! Your concerns are
Memory introduction
Eight major trends in the industrial Internet of things (iiot)
Vulnerability management mistakes that CIOs still make
8 lines of code to teach you how to build an intelligent robot platform
kotlin 语言特性
Internet of things? Come and see Arduino on the cloud
Go deep into high-performance JSON parsing libraries in go
kotlin 匿名函数 与 Lambda