当前位置:网站首页>Kotlin coordination channel

Kotlin coordination channel

2022-06-24 13:53:00 day_ moon

// 1. passageway   Method of transmitting value stream 
fun main_channel() = runBlocking {
    val channel = Channel<Int>()// Make a statement channel
    launch {
        for (i in 1..3) {
            channel.send(i + i)
        }// send out 1+1 2+2 3+3
        channel.close()// The channel is not closed immediately 
    }
    repeat(3) {  //  It's OK to write like this for (y in channel) println(y)
        println(" What was received was ${channel.receive()}")
    }
    println("end ..")
}

//2. Build channel producers 
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {// Build channel producers 
    for (i in 1..3) send(i + i)
}

fun main_produce() = runBlocking {
    val produce = producer()
    produce.consumeEach { println("$it") }// Iterate over each producer 
    println("end ..")
}

//3  The Conduit 
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {// Every time add 1  Generate infinite   No return type 
    var x = 1
    while (true) send(x++)
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
    produce {// Received first , And then send it after processing 
        for (x in numbers) send(x + x)
    }

fun main_pip() = runBlocking {
    val numbers = produceNumbers() // Every time add 1  Generate infinite   No return type 
    val squares = square(numbers) // Received first , And then send it after processing 
    repeat(3) {// repeat 3 Time 
        println(squares.receive())
    }
    println("end ..") // complete 
    coroutineContext.cancelChildren() //  Cancel subprocess 
}
 //4. The prime number of pipes 
 fun main_filters() = runBlocking {
     var cur = numberssend(2)// Start with two  2 3 4..
     repeat(10) {// take 10 Number 
         val number = cur.receive()
         println(" $number")
         cur = filters(cur, number)
     }
     coroutineContext.cancelChildren() // cancel all children to let main finish
 }
fun CoroutineScope.numberssend(start: Int) = produce<Int> {// Data is added every time 1 send out 
    var x = start
    while (true) send(x++)
}
// from 2 Start a digital stream , Get a prime number from the current channel , And start a new pipeline for each prime number found 
fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)// Can be divided by the number sent   And then send 
}
//5. Fan out 
fun mainFanout() = runBlocking<Unit> {
    val producer = produceNumber()
    repeat(5) {
        launchProcessor(it, producer)
    }
    delay(950)
    producer.cancel() // Cancel 
}

fun CoroutineScope.produceNumber() = produce<Int> {
    var x = 1
    while (true) {
        send(x++) // from 1 Start and add each time 1
        delay(100) //
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}
//6. Fan in 
fun main_in() = runBlocking {
    val channel = Channel<String>()
    // Two coroutines to send a string 
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
//7. Buffered channels 
fun main_Buffered() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // Capacity of 4
    val sender = launch { //  Start the coroutines 
        repeat(10) {
            println(" Data sent  $it")//0-4
            channel.send(it)
        }
    }
    delay(100)// Delay 1 second 
    sender.cancel() // Cancel 
}
//8. The channel is fair 
data class Ball(var hits: Int)
fun main_fair() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }

}
//9. Timing channel 
fun main_Ticker() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}
原网站

版权声明
本文为[day_ moon]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/175/202206241048331171.html