import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis
@Volatile // volatile 로 선언한다고 해서 동기화 문제가 해결되지 않는다.
var counter = 0
// 일반적인 해결책으로는 (스레드나, 코루틴이나) thread-safe data structure 를 이용
val counterAtomic = AtomicInteger()
// Thread Confinement :
// an approach to the problem of shared mutable state where all access to the particular
// shared state is confined to a single thread.
// It is typically used in UI applications, where all UI state is confined to the single
// event-dispatch/application thread.
val counterContext = newSingleThreadContext("CounterContext")
var counterConfined = 0
var counterConfinedCoarse = 0
//Mutual Exclusion
val mutex = Mutex()
var counterMutex = 0
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
// define a function that launches an actor using an actor coroutine builder
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
// `CompletableDeferred.complete()`
// Completes this deferred value with a given value.
// The result is true if this deferred was completed as a result of this invocation
// and false otherwise (if it was already completed).
// Subsequent invocations of this function have no effect and always produce false.
// This function transitions this deferred into completed state if it was not
// completed or cancelled yet. However, if this deferred has children, then it transitions
// into completing state and becomes complete once all its children are complete.
}
}
}
fun main() {
runBlocking {
withContext(Dispatchers.Default) {
massiveRun { counter++ }
}
println("Counter = $counter")
}
runBlocking {
withContext(Dispatchers.Default) {
massiveRun { counterAtomic.incrementAndGet() }
}
// 단순한 카운터, 콜렉션, 큐, 다른 표준적인 자료구조들에 대한
// 기본적인 연산인 경우 스레드 세이프한 자료구조를 도입하는게 가장 빠른 해결책이다.
println("Counter Atomic = $counterAtomic")
}
runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// fine-grained thread confinement 이므로 매우 느림
withContext(counterContext) {
// Each individual increment switches from multithreaded
// Dispatchers.Default context to the single-threaded context
// using withContext(counterContext) block
counterConfined++
}
}
}
println("Counter Confined = $counterConfined")
}
// Thread confinement coarse-grained
// 일반적으로 스레드 감금(?)은 큰 덩어리로 일어난다.
// 상태를 업데이트하는 큰 덩어리의 비즈니스 로직들은 하나의 스레드에 감금되어 있다.
runBlocking {
withContext(counterContext) {
massiveRun {
counterConfinedCoarse++
}
}
println("Counter Confined Coarse = $counterConfinedCoarse")
}
runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 각 increment 를 lock 으로 보호한다
mutex.withLock {
// Mutex.lock(), unlock() 을 호출하는 부분이 확장함수 내부에 있다.
// 이 함수는 suspending function 이므로 thread 를 Blocking 하지 않는다.
// blocking 세상에서는 보통 `synchronized` 혹은 `ReentrantLock` 을 사용할 것이다.
counterMutex++
}
}
}
println("Counter Mutex = $counterMutex")
}
runBlocking {
val counter = counterActor()
withContext(Dispatchers.Default) {
massiveRun { counter.send(IncCounter) }
}
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter Actor = ${response.await()}")
counter.close() // shutdown the actor
}
//Indeed, actors may modify their own private state,
// but can only affect each other through messages (avoiding the need for any locks).
}
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
coroutineScope {
repeat(n) {
launch { repeat(k) { action() } }
}
}
}
println("Completed ${n * k} actions in $time ms")
}
실행 결과
Completed 100000 actions in 14 ms
Counter = 49868
Completed 100000 actions in 18 ms
Counter Atomic = 100000
Completed 100000 actions in 369 ms
Counter Confined = 100000
Completed 100000 actions in 4 ms
Counter Confined Coarse = 100000
Completed 100000 actions in 58 ms
Counter Mutex = 100000
Completed 100000 actions in 161 ms
Counter Actor = 100000
요약
- 코루틴 세상에서 공유 자원에 대한 동기화 기법으로 다음과 같은 해결책이 있다.
1. Atomic 을 지원하는 자료구조 사용하기
2. Thread Confinement (하나의 스레드에서만 업데이트 수행하도록 하기)
3. Mutex.withLock { ... } 사용하기 (suspend 함수이므로 스레드를 blocking 하지 않음)
4. Actor 사용하기 (아직 뭔지 잘 모르겠다)
- Actor 개념
An actor is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, and a channel to communicate with other coroutines.
actor { ... } 함수를 이용하면 메시지가 들어오는 메일박스 채널과, 메시지를 보내는 채널을 편리하게 합쳐서 구성할 수 있다.
'Operating System' 카테고리의 다른 글
[OS] Memory Management (3&4) (0) | 2022.01.24 |
---|---|
[OS] Memory Management (2) (0) | 2021.12.24 |
[OS] Memory Management (1) (0) | 2021.12.24 |
[System Software] gcc, compiling, linking, libraries (0) | 2021.02.13 |
[System Software] System Software & Program Execution (0) | 2021.02.12 |