当前位置:网站首页>Kotlin asynchronous flow
Kotlin asynchronous flow
2022-06-24 13:39:00 【day_ moon】
//1. Indicates multiple values Stream is used to return multiple asynchronously computed values
fun foo(): List<Int> = listOf(1, 2, 3)
fun forEachList() {
foo().forEach { value -> println(value) }
}
//2. Sequence
fun foos(): Sequence<Int> = sequence {// Use a sequence (Sequence) To represent numbers
for (i in 1..3) {
Thread.sleep(1000)// wait for 100 millisecond
yield(i)// The next value is
}
}
fun forEachSequences() {
foos().forEach { value -> println(value) }
}
//3 Suspend function
suspend fun foo_Suspending(): List<Int> {//suspend Decoration does not block the main thread ,List<Int> We can only return all values at once
delay(1000)
return listOf(1, 2, 3)
}
fun main_Suspending() = runBlocking {
foo_Suspending().forEach { value -> println(value) }
}
//4.Flows
fun foo_flows(): Flow<Int> = flow {// The constructor function is named flow No longer mark suspend Modifier
for (i in 1..3) { //flow{...} Code blocks in can be suspended
delay(2000)
emit(i)// Value through emit Function is emitted from the stream
}
}
fun main_flows() = runBlocking<Unit> {
launch {// Used to check whether the main thread is blocked
for (k in 1..3) {
println("k $k")
delay(1000)// wait for 1000 millisecond Does not block the main thread
}
}
foo_flows().collect { value -> println("$value") }// collect Function from flow The value of
}
//5. The flow is cold
fun foo_cold(): Flow<Int> = flow {
for (i in 1..3) {//flow Each collection starts
println("Flow Turn on ")
delay(1000)
emit(i)
}
}
fun main_cold() = runBlocking {
val flows = foo_cold()
println("...$flows")
flows.collect { value -> println("$value") }// First open , Then print the value
println("... collect ")
}
//6. Cancel flow
fun foo_cancel(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun main_cancel() = runBlocking {
withTimeoutOrNull(1200) {// Running one will cancel
foo_cancel().collect { value -> println("$value") }
}
println("end")
}
//7. Stream builder asFlow
fun main_asFlow() = runBlocking {
(1..3).asFlow().collect { value -> println("$value") }
}
//8. Intermediate flow operator
suspend fun per_request(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_map() = runBlocking {
(1..3).asFlow()// Build flow
.map { request -> per_request(request) }// Intermediate operators
.collect { value -> println("$value") }
}
//9. Conversion operators
suspend fun per_transform(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_transform() = runBlocking {
(1..3).asFlow()// Build flow
.transform { request ->
emit("request $request")// An asynchronous request is preceded by a string and followed by a response
emit(per_transform(request))
}// Intermediate operators
.collect { value -> println("$value") }
}
//10 Length limiting operator
fun number(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("end")// Run here to close
emit(3)
} finally {
println("finally")
}
}
fun main_take() = runBlocking {
number().take(2).collect { value -> println("$value") }//take Limit length
}
//11. Stream operators
fun main_reduce()= runBlocking {
val sun= (1..3).asFlow()// Tectonic flow
.map { it*it }
.reduce { a, b ->a + b }// Turn the result into addition Then return
println("$sun")
}
//12 The flow is continuous Like an assembly line
fun main_flowd() = runBlocking {
(1..10).asFlow().filter { println(" Request value $it")
it % 2 != 0 }
.map {
println(" The return value is $it")
}.collect { value -> println("$value") }
}
//13 Stream context reservation That is, after There will be value It's preserved
fun main_save()= runBlocking {
(1..5).asFlow().filter {it%2==0}.map {
println(" The return value is $it")
}.collect { value -> println(" Finally back to $value") }
}
//14 Misused withContext
fun flow_withContext():Flow<Int> = flow {
withContext(Dispatchers.Default){// This line reports a mistake Flow invariant is violated
for (i in 1..3){
emit(i)
println("$i")
}
}
}
fun main_withContext()= runBlocking {
flow_withContext().collect { value -> println("$value") }
}
//15 flowOn Operator The context of the flow cannot be changed
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun flow_flowOn():Flow<Int> = flow {
for (i in 1..3){
log("flow_flowOn $i")
emit(i)
}
}.flowOn(Dispatchers.IO)// Execute in child thread
fun main_flowOn()= runBlocking {
log(" Lord ")// Main thread execution
flow_flowOn().collect { value -> log("main_flowOn $value") }// Main thread execution
}
//16 buffer Reduce collection time
fun flow_buffer():Flow<Int> = flow {
for (i in 1..3){
delay(2000)// altogether 6000
emit(i)
}
}
fun main_buffer() = runBlocking {
val time= measureTimeMillis {
flow_buffer().buffer(1).collect { value ->
delay(300)// This step collects It only took 300 many
println("$value") }
}
println(" The time required $time")//6326
}
//17 Merge conflate
fun flows_conflate():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_conflate() = runBlocking {
flows_conflate().conflate().// The second number is merged ( discarded )
collect { value -> println("$value") }
}
//18 Process latest collectLatest
fun flow_collectLatest():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_collectLatest()= runBlocking {
flow_collectLatest().collectLatest { value ->
delay(300)// Delay 3 second
println("$value") }// Print only the last
}
//19 Combined flow zip combine The role of combined flow
val one=(1..2).asFlow()
val two= flowOf("4","5","6")
fun main_zip() = runBlocking {
// one.zip(two){a,b->"$a->$b"}.collect { value -> println("$value") }// Don't print 6
one.combine(two){a,b->"$a->$b"}.collect { value -> println("$value") }//2 repeat Then follow 6 Printed together
}
//20 Advection flow
// flatMapConcat Wait for the internal flow to complete , Then start collecting the next stream
//flatMapMerge Collect all incoming streams at the same time and merge their values into a single stream , In order to send the value as soon as possible
//flatMapLatest Process the latest value
fun flow_Flattening(i: Int):Flow<String> = flow {
emit("old $i")
delay(1000)
emit("new $i")
}
fun main_Flattening() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }.flatMapLatest { flow_Flattening(it) }.collect { value ->
println("$value Collection time ${System.currentTimeMillis() - startTime} ms ")
}
}
//21 Catch exception flow
//sampleStart
fun flow_catch(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { println("$value") }// Make an exception
"string $value"
}
fun main_catch() = runBlocking<Unit> {
//1. use try catch
// try {
// flow_catch().collect { value -> println(value) }
// } catch (e: Throwable) {
// println("Caught $e")// Catch any exception
// }
//2.catch
// flow_catch().catch { e-> emit("Caught $e") }.collect { value -> println("$value") }// use catch replace
//3. Put the exception in collect That is, downstream It doesn't work
flow_catch().catch { e-> emit("Caught $e") }.collect { value ->
// check(value <= "1") { "Collected $value" }//
println(value)
}
//4. Catch exceptions declaratively
flow_catch().onEach { value ->
check(value <= "1") { " Check $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}//22. Flow complete except finally Outside , also onCompletion
fun flows_oncompletion():Flow<Int> = flow {
emit(1)// Send numbers 1 Exception thrown after
throw RuntimeException()
}
fun main_onCompletion() = runBlocking {
flows_oncompletion().onCompletion {//onCompletion There is one Throwable Parameters can be used to determine whether the stream collection is completed normally or abnormally
cause ->if (cause!=null) println(" There are exceptions ") }
.catch { //onCompletion Operator does not handle exceptions The anomaly will still flow downstream
cause -> println(" Exception is $cause") }.collect { value -> println("$value") }
}
//23 Start the flow
fun main_even() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") // Register a piece of code as a response to an incoming event
}.collect( )// Wait for the collection stream to complete
println("end ..")// When the collection is completed To run this
}
//24 Start the flow
fun main_launchIn() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") // Register a piece of code as a response to an incoming event
}.launchIn( this)// Wait for the collection stream to complete
println("end ..")// Run this first
}边栏推荐
- Nifi from introduction to practice (nanny level tutorial) - environment
- Quickly understand the commonly used message summarization algorithms, and no longer have to worry about the thorough inquiry of the interviewer
- 源码解析 Handler 面试宝典
- kotlin 匿名函数 与 Lambda
- MySQL interview questions
- 初中级开发如何有效减少自身的工作量?
- Internet of things? Come and see Arduino on the cloud
- Geological disaster early warning monitoring RTU
- Kotlin initialization block
- The second phase of freshman engineering education seminar is to enroll in the China 100 school peer program
猜你喜欢
随机推荐
Eight major trends in the industrial Internet of things (iiot)
发扬连续作战优良作风 全力以赴确保北江大堤安全
kotlin 组合挂起函数
kotlin 数组、集合和 Map 的使用
Use abp Zero builds a third-party login module (I): Principles
源碼解析 Handler 面試寶典
Nifi from introduction to practice (nanny level tutorial) - environment
The data value reported by DTU cannot be filled into Tencent cloud database through Tencent cloud rule engine
【sdx62】WCN685X IPA不生效问题分析及解决方案
Party, Google's autoregressive Wensheng graph model
黄金年代入场券之《Web3.0安全手册》
SYSTEMd common component description
kotlin 关键字 扩展函数
39 - read XML node and attribute values
Creation and use of unified links in Huawei applinking
开发者调查:Rust/PostgreSQL 最受喜爱,PHP 薪水偏低
10 reduce common "tricks"
Express 100 Express query interface (API) interface specification document - detailed version
Evolution of the message module of the play live series (3)
These default routes and static routes can not be configured and deployed. What kind of network workers are they!








