当前位置:网站首页>Deep dive kotlin synergy (XIV): problems of shared state

Deep dive kotlin synergy (XIV): problems of shared state

2022-06-23 10:44:00 RikkaTheWorld

Series eBook : Portal


Before we start , First look at the following UserDownload class . It allows us to pass id To get the user information or get all the user information downloaded before . What's wrong with this implementation ?

class UserDownloader(
     private val api: NetworkService
) {
    
    private val users = mutableListOf<User>()
    
    fun downloaded(): List<User> = users.toList()
    
    suspend fun fetchUser(id: Int) {
    
        val newUser = api.fetchUser(id)
        users.add(newUser)
    }
}

Please note the defensive copy here toList, This is done to avoid conflicts between reading the object returned from the download and adding elements to the mutable list . We can also use read-only lists (List<User>) And read-write properties (var) To modify users. such , We don't need to make defensive copies , The downloaded information does not need to be protected at all , But we will slow down the performance of adding elements to the collection . I personally prefer the latter , But I decided to take the method of using variable sets as an example , Because I often see this in real projects .

The above implementation is not ready for concurrent use , Every time fetchUser Calls will modify users. As long as this function is not called in more than one thread at the same time , There is no problem . But because it can start on multiple threads at the same time , So we call it users Is a shared state , So it needs to be protected . This is because concurrent modifications can lead to conflicts . The question is as follows :

class FakeNetworkService : NetworkService {
    
    
    override suspend fun fetchUser(id: Int): User {
    
        delay(2)
        return User("User$id")
    }
}

suspend fun main() {
    
    val downloader = UserDownloader(FakeNetworkService())
    coroutineScope {
    
        repeat(1_000_000) {
    
            launch {
    
                downloader.fetchUser(it)
            }
        }
    }
    print(downloader.downloaded().size) // ~998242
}

Because multiple threads interact with the same instance , So the above code will print a less than 1,000,000 The number of ( for example 998242), Or it might throw an exception .

Exception in thread “main”
java.lang.ArrayIndexOutOfBoundsException: 22
at java.util.ArrayList.add(ArrayList.java:463)

This is a classic problem when modifying the shared state . In order to understand it more clearly , I'll show you a simpler example : Incrementing an integer with multiple threads . I use Dispatchers.Default call massiveRun, It will start 1000 Collaborators cheng , Each co process operation 1000 Times increasing , After these operations , The number of results should be 1,000,000(1,000 * 1,000). however , If there is no data synchronization , The actual result will be smaller because of the conflict .

var counter = 0
fun main() = runBlocking {
    
    massiveRun {
    
        counter++
    }
    println(counter) // ~567231
}

suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
    
        repeat(1000) {
    
            launch {
    
                repeat(1000) {
     action() }
            }
        }
    }

Understand why the result is not 1,000,000, Imagine a scene like this : Two threads try to increase the same number at the same time . Suppose the initial value is 0, The value currently received by the first thread is 0, then CPU Immediately decided to switch to the second thread , The number received by the second thread is also 0, Then add it 1, And stored in variables . We switch to where we ended before the first thread : Its value is 0, So it will be incremented to 1 And store it . The final variable will be 1, But it should be 2. This is the problem of losing some operations .

Blocking synchronization

The above questions can be used from Java Provides classic tools to solve , Such as synchronized blocks or synchronized sets .

var counter = 0
fun main() = runBlocking {
    
    val lock = Any()
    massiveRun {
    
        synchronized(lock) {
     // Here we block the thread !
            counter++
        }
    }
    println("Counter = $counter") // 1000000
}

This solution works , But there are also some problems . The biggest problem is , You cannot use a suspend function in a synchronization block . The second is , When a process is waiting for its turn , This synchronization block blocks the thread . I hope I am learning Dispatcher Co scheduler After a chapter , You can understand that we don't want to block threads , Because it may be the main thread . Why waste these resources ? We should use some specific collaboration tools . Not blocking , It's about suspending or avoiding conflict . therefore , Let's put this solution aside , And explore some other solutions .

atom

There's another way Java Our solution can help us in some simple cases . Java Have a set of atomic values , All of their operations are fast , And the guarantee is “ Thread safe ”. They are called atoms . Their operations are implemented at the bottom without locks . So this solution is effective , And it suits us , We can use different atomic values . For our example , We can use AtomicInteger.

·· picture ···

private var counter = AtomicInteger()
fun main() = runBlocking {
    
    massiveRun {
    
        counter.incrementAndGet()
    }
    println(counter.get()) // 1000000
}

It works perfectly here , But the utility of atomic values is usually very limited , So we need to be careful : When we have a bunch of operations , Just one operation that is atomic does not help us .

private var counter = AtomicInteger()
fun main() = runBlocking {
    
    massiveRun {
    
        counter.set(counter.get() + 1)
    }
    println(counter.get()) // ~430467
}

To make sure UserDownloader The safety of the , We can use AtomicReference Wrap a read-only user list . We can use getAndUpdate Atomic function to update its value , Without conflict .

class UserDownloader(
private val api: NetworkService
) {
    
    private val users = AtomicReference(listOf<User>())
   
    fun downloaded(): List<User> = users.get()
    
    suspend fun fetchUser(id: Int) {
    
        val newUser = api.fetchUser(id)
        users.getAndUpdate {
     it + newUser }
    }
}

We often use atoms to protect primitives or individual references , But for more complex situations , We still need to use better tools .

A single thread scheduler

stay Dispatcher The chapter of process scheduler in , We learned about a collaboration scheduler that manages a single thread . For the vast majority of shared state problems , It's the simplest solution .

val dispatcher = Dispatchers.IO
    .limitedParallelism(1)

var counter = 0
fun main() = runBlocking {
    
    massiveRun {
    
        withContext(dispatcher) {
    
            counter++
        }
    }
    println(counter) // 1000000
}

In practice , There are two uses for this method , The first approach is called coarse-grained thread constraints . It's a simple way , We just need to use withContext To wrap the entire function ,dispatcher Run the throttling behavior on a single thread . The solution is simple , And eliminate conflicts , But the problem is that our entire function has lost multithreading . Let's look at the following example , api.fetchUser(id) It can be started concurrently on multiple threads , But its main body will run in a dispatcher On , The dispatcher Limit running on a single thread . therefore , Call blocking or CPU During intensive operation , The execution speed of this function may slow down .

class UserDownloader(
    private val api: NetworkService
) {
    
    private val users = mutableListOf<User>()
    private val dispatcher = Dispatchers.IO
        .limitedParallelism(1)
    
    suspend fun downloaded(): List<User> =
        withContext(dispatcher) {
    
            users.toList()
        }
    
    suspend fun fetchUser(id: Int) = withContext(dispatcher) {
    
        val newUser = api.fetchUser(id)
        users += newUser
    }
}

The second approach is called fine-grained thread constraints . In this way , We only wrap the statements that ultimately change the State . In our example , Is all use users The line of . This kind of writing is more demanding , But if you exclude anything else ( In this case fetcheUser) Blocked or CPU Intensive code , It can provide better performance . If they are just normal hang functions , Then the performance improvement is relatively general .

class UserDownloader(
    private val api: NetworkService
) {
    
    private val users = mutableListOf<User>()
    private val dispatcher = Dispatchers.IO
        .limitedParallelism(1)
    
    suspend fun downloaded(): List<User> =
        withContext(dispatcher) {
    
            users.toList()
        }

    suspend fun fetchUser(id: Int) {
    
        val newUser = api.fetchUser(id)
        withContext(dispatcher) {
    
            users += newUser
        }
    }
}

in the majority of cases , Using a scheduler with a single thread is not only simple , And it's efficient , This is because standard schedulers share the same thread pool .

The mutex

The last popular way is to use mutexes . You can think of shared content as a room with only one key , Its most important function is lock . When the first coroutine calls its , Xie Cheng will get the key , You can lock into shared content without hanging . If another coroutine calls lock, It will be suspended , Until the first coroutine call unlock, If another coroutine runs to a locked place , It will be suspended and put in the queue , Just after the second process . When the first coroutine runs out and finally calls unlock when , It will return the key , The next step is to resume the second process ( The first coroutine in the queue ), And can eventually be used lock To use shared content . therefore , stay lock and unlock There is only one coordination process .

suspend fun main() = coroutineScope {
    
    repeat(5) {
    
        launch {
    
            delayAndPrint()
        }
    }
}

val mutex = Mutex()

suspend fun delayAndPrint() {
    
    mutex.lock()
    delay(1000)
    println("Done")
    mutex.unlock()
}
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done

Use it directly lock and unlock There are risks , Because any difference between the two will cause the key to never be returned (unlock Never call ), Therefore, no other collaboration can access the shared content through the lock . This is a serious problem , It is called a deadlock . therefore , We can use withLock function , It calls... From lock Start , stay finally Block call unlock End of the function , Even if any exception is thrown within the block, the lock can be successfully released . Its use is similar to that of synchronous blocks .

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    
    massiveRun {
    
        mutex.withLock {
    
            counter++
        }
    }
    println(counter) // 1000000
}

The important advantages of mutexes over synchronous blocks are : We can hang up a collaboration , Instead of blocking a thread . This is a safer 、 Lighter options . Compared to using a scheduler that is limited to a single thread , Mutexes are much lighter , In some cases , It may provide better performance . On the other hand , It is also more difficult to use correctly . It has a potential risk : The coroutine cannot pass the lock twice . Executing the following code will result in a program state called deadlock —— It will be blocked forever :

suspend fun main() {
    
    val mutex = Mutex()
    println("Started")

    mutex.withLock {
    
        mutex.withLock {
    
            println("Will never be printed")
        }
    }
}
// Started
// (runs forever)

The second problem with mutexes is : When the collaboration hangs , Mutexes cannot be unlocked . Take a look at the code below , The execution time will exceed 5 second , It's because the mutex is in delay Still locked during :

class MessagesRepository {
    
    private val messages = mutableListOf<String>()
    private val mutex = Mutex()
    
    suspend fun add(message: String) = mutex.withLock {
    
        delay(1000) //  We simulate network operations 
        messages.add(message)
    }
}

suspend fun main() {
    
    val repo = MessagesRepository()
    val timeMillis = measureTimeMillis {
    
        coroutineScope {
    
            repeat(5) {
    
                launch {
    
                    repo.add("Message$it")
                }
            }
        }
    }
    println(timeMillis) // ~5120
}

When we use a scheduler that is limited to a single thread , There won't be such a problem , When one delay Or a network request suspends a collaboration , This thread can be used by other coroutines .

class MessagesRepository {
    
    private val messages = mutableListOf<String>()
    private val dispatcher = Dispatchers.IO
        .limitedParallelism(1)
    
    suspend fun add(message: String) =
        withContext(dispatcher) {
    
            delay(1000) //  We simulate network requests 
            messages.add(message)
        }
}
suspend fun main() {
    
    val repo = MessagesRepository()
    val timeMillis = measureTimeMillis {
    
        coroutineScope {
    
            repeat(5) {
    
                launch {
    
                    repo.add("Message$it")
                }
            }
        }
    }
    println(timeMillis) // 1058
}

This is why we should avoid using mutexes to wrap the entire function ( A coarse-grained approach ), When we use it , We need to be very careful to do this , Avoid locking shared content and calling suspended functions twice .

class MongoUserRepository(
    //...
) : UserRepository {
    
    private val mutex = Mutex()
    
    override suspend fun updateUser(
        userId: String,
        userUpdate: UserUpdate
    ): Unit = mutex.withLock {
    
        //  Yes , update  It should happen in  db
        //  Not in this place 
        //  This is just an example 
        val currentUser = getUser(userId) //  Deadlock !
        deleteUser(userId) //  Deadlock !
        addUser(currentUser.updated(userUpdate)) //  Deadlock !
    }
    
    override suspend fun getUser(
        userId: String
    ): User = mutex.withLock {
    
        // ...
    }
    override suspend fun deleteUser(
        userId: String
    ): Unit = mutex.withLock {
    
        // ...
    }
    override suspend fun addUser(
        user: User
    ): User = mutex.withLock {
    
        // ...
    }
}

Fine grained thread restrictions ( Only the places where we modify the shared state ) It will help , But in the example above , I prefer to use a scheduler that is limited to a single thread .

summary

When modifying the shared state , There are many ways to orchestrate a collaboration to avoid conflicts . The most practical solution is to use the scheduler to modify the shared state , The scheduler limits only a single thread . This can be a fine-grained thread restriction . It only encapsulates the need to synchronize specific locations ; perhaps , It can also be a coarse-grained thread constraint that encapsulates the entire function . The second method is relatively simple , But it may be slow . We can also use atomic values or mutexes .

原网站

版权声明
本文为[RikkaTheWorld]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/174/202206231030573603.html