멀티 스레드 처리는 애플리케이션의 퍼포먼스에 큰 이점을 주지만 경쟁 상태(race condition)를 적절히 조절해 주지 않는다면 데이터의 손실을 야기할 수 있다.
- 경쟁 상태란 여러 개의 스레드가 하나의 공유/변경 가능한 자원에 접근하는 것을 말한다. 자세히 보기
데이터 손실을 막기 위해서는 동기화(synchronization)를 통해 race condition을 제어하여 올바른 멀티 스레드 환경을 구축해야 한다.
그러기 위한 여러 방법 중 하나로 Mutex를 고를 수 있다.
Mutex
Mutex는 공유자원에 변경이 일어나는 순간 적절한 block을 통해 race condition을 제어하는 동기화 제어 기법이다.
코드가 임계 구역에 있는 경우 절대로 동시성이 발생하지 않게 하고 오직 하나의 루틴만 접근하는 것을 보장한다.
- 임계 구역이란 한 번에 하나의 프로세스만 액세스 할 수 있는 코드 영역을 말한다. 자세히 보기
코루틴에서 제공하는 Mutex의 함수들을 통해 간단히 Mutex 방식의 동기화 제어가 가능하다.
private val mutex = Mutex()
private var counter = 0
fun execute() = runBlocking {
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
print("Counter = $counter")
}
2021-12-21 11:24:41.770 5435-5435/com.mj.foundation I/System.out: Completed 1000 actions in 36 ms
2021-12-21 11:24:41.770 5435-5435/com.mj.foundation I/System.out: Counter = 1000
Mutex 외에도 Coroutine에서 동기화 처리 방법이 더 존재한다.
Single Thread
말 그대로 하나의 스레드만 사용한다.
싱글 스레드를 사용하도록 코루틴의 context를 지정해주면 데이터의 무결성을 보장하며 결과를 얻어낼 수 있지만
멀티 스레드 처리에 비해 성능이 떨어진다.
@ObsoleteCoroutinesApi
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
@ObsoleteCoroutinesApi
fun execute(){
runBlocking {
GlobalScope.massiveRun {
withContext(counterContext){
counter++
}
}
println("Counter = $counter")
}
}
2021-12-21 11:31:22.254 5986-5986/com.mj.foundation I/System.out: Completed 1000 actions in 846 ms
2021-12-21 11:31:22.254 5986-5986/com.mj.foundation I/System.out: Counter = 1000
Actor
동기화 문제 여지가 있는 자원을 actor 내에서 관리하도록 하고, actor 클래스 멤버 변수로 정의되어 있는 channel을 통해 자원으로 접근이 가능하다.
channel은 FIFO 방식의 Queue 형태를 가지고 있고 순차적인 접근을 보장하여 동기화 처리를 할 수 있다.
sealed class CounterMsg {
object IncreaseCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
}
@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0
for (msg in channel){
when(msg){
is CounterMsg.IncreaseCounter -> counter++
is CounterMsg.GetCounter -> msg.response.complete(counter)
}
}
}
@ObsoleteCoroutinesApi
fun execute(){
runBlocking {
val counter = counterActor()
GlobalScope.massiveRun {
counter.send(CounterMsg.IncreaseCounter)
}
val response = CompletableDeferred<Int>()
counter.send(CounterMsg.GetCounter(response))
println("Counter = ${response.await()}")
counter.close()
}
}
2021-12-21 11:20:09.896 23014-23014/? I/System.out: Completed 1000 actions in 224 ms
2021-12-21 11:20:09.899 23014-23014/? I/System.out: Counter = 1000
예시에서 사용하는 공통 확장 함수
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 10
val k = 100
val time = measureTimeMillis {
val jobs = List(n) {
coroutineScope {
launch {
repeat(k) {
action()
}
}
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
'Kotlin' 카테고리의 다른 글
[Kotlin] 코틀린의 시퀀스(Sequence) (0) | 2022.01.24 |
---|---|
[Kotlin] 코틀린에서 RxJava와 Coroutine (0) | 2022.01.06 |
[Kotlin] inline 함수 (0) | 2021.09.06 |
[Kotlin] Flow (0) | 2021.08.13 |
[Kotlin] 리스트를 통한 명령형 방식과 함수형 방식 비교 (0) | 2021.08.04 |