Operating System

[Kotlin] Shared Mutable State & Concurrency

Sara.H 2022. 10. 30. 01:01
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

@Volatile // volatile 로 선언한다고 해서 동기화 문제가 해결되지 않는다.
var counter = 0

// 일반적인 해결책으로는 (스레드나, 코루틴이나) thread-safe data structure 를 이용
val counterAtomic = AtomicInteger()

// Thread Confinement :
// an approach to the problem of shared mutable state where all access to the particular
// shared state is confined to a single thread.
// It is typically used in UI applications, where all UI state is confined to the single
// event-dispatch/application thread.
val counterContext = newSingleThreadContext("CounterContext")
var counterConfined = 0
var counterConfinedCoarse = 0

//Mutual Exclusion
val mutex = Mutex()
var counterMutex = 0

sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

// define a function that launches an actor using an actor coroutine builder
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)

            // `CompletableDeferred.complete()`
            // Completes this deferred value with a given value.
            // The result is true if this deferred was completed as a result of this invocation
            // and false otherwise (if it was already completed).
            // Subsequent invocations of this function have no effect and always produce false.
            // This function transitions this deferred into completed state if it was not
            // completed or cancelled yet. However, if this deferred has children, then it transitions
            // into completing state and becomes complete once all its children are complete.
        }
    }
}


fun main() {
    runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun { counter++ }
        }
        println("Counter = $counter")
    }

    runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun { counterAtomic.incrementAndGet() }
        }
        // 단순한 카운터, 콜렉션, 큐, 다른 표준적인 자료구조들에 대한
        // 기본적인 연산인 경우 스레드 세이프한 자료구조를 도입하는게 가장 빠른 해결책이다.
        println("Counter Atomic = $counterAtomic")
    }

    runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun {
                // fine-grained thread confinement 이므로 매우 느림
                withContext(counterContext) {
                    // Each individual increment switches from multithreaded
                    // Dispatchers.Default context to the single-threaded context
                    // using withContext(counterContext) block
                    counterConfined++
                }
            }
        }
        println("Counter Confined = $counterConfined")
    }

    // Thread confinement coarse-grained
    // 일반적으로 스레드 감금(?)은 큰 덩어리로 일어난다.
    // 상태를 업데이트하는 큰 덩어리의 비즈니스 로직들은 하나의 스레드에 감금되어 있다.
    runBlocking {
        withContext(counterContext) {
            massiveRun {
                counterConfinedCoarse++
            }
        }
        println("Counter Confined Coarse = $counterConfinedCoarse")
    }

    runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun {
                // 각 increment 를 lock 으로 보호한다
                mutex.withLock {
                    // Mutex.lock(), unlock() 을 호출하는 부분이 확장함수 내부에 있다.
                    // 이 함수는 suspending function 이므로 thread 를 Blocking 하지 않는다.
                    // blocking 세상에서는 보통 `synchronized` 혹은 `ReentrantLock` 을 사용할 것이다.
                    counterMutex++
                }
            }
        }
        println("Counter Mutex = $counterMutex")
    }

    runBlocking {
        val counter = counterActor()
        withContext(Dispatchers.Default) {
            massiveRun { counter.send(IncCounter) }
        }
        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        println("Counter Actor = ${response.await()}")
        counter.close() // shutdown the actor
    }
    //Indeed, actors may modify their own private state, 
    // but can only affect each other through messages (avoiding the need for any locks).
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100
    val k = 1000
    val time = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch { repeat(k) { action() } }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

 

실행 결과

Completed 100000 actions in 14 ms
Counter = 49868
Completed 100000 actions in 18 ms
Counter Atomic = 100000
Completed 100000 actions in 369 ms
Counter Confined = 100000
Completed 100000 actions in 4 ms
Counter Confined Coarse = 100000
Completed 100000 actions in 58 ms
Counter Mutex = 100000
Completed 100000 actions in 161 ms
Counter Actor = 100000

 

요약

- 코루틴 세상에서 공유 자원에 대한 동기화 기법으로 다음과 같은 해결책이 있다.

1. Atomic 을 지원하는 자료구조 사용하기

2. Thread Confinement (하나의 스레드에서만 업데이트 수행하도록 하기)

3. Mutex.withLock { ... } 사용하기 (suspend 함수이므로 스레드를 blocking 하지 않음)

4. Actor 사용하기 (아직 뭔지 잘 모르겠다)

 

- Actor 개념

An actor is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, and a channel to communicate with other coroutines.

actor { ... } 함수를 이용하면 메시지가 들어오는 메일박스 채널과, 메시지를 보내는 채널을 편리하게 합쳐서 구성할 수 있다.