๐ค
๊ฐ๋ 8๏ธโฃ Shared mutable state and concurreny
์ฝ๋ฃจํด์ Dispatchers.Default์ ๊ฐ์ ๋ฉํฐ์ค๋ ๋ Dispatcher๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๋ ฌ๋ก ์ํ๋๋ค. ์ด๋ฌํ ์ฝ๋ฃจํด์์ ๋ณ๋ ฌ๋ก ๋ฌด์ธ๊ฐ๋ฅผ ์ํํ ๋ ์ฃผ์ํด์ผํ ๋ฌธ์ ๊ฐ ์๋๋ฐ, ๋ฐ๋ก shared mutable state์ ์ ๊ทผํ ๋์ synchronization ๋ฌธ์ ์ด๋ค.
Problem ๐ฟ
์์ฒ๋ฒ ๊ฐ์ ํ๋์ ๋ฐ๋ณตํ๋ ๋ฐฑ๊ฐ์ ์ฝ๋ฃจํด์ launch
ํ๋ค๊ณ ํด๋ณด์.
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
์ด๋ฌํ suspend ํจ์์ counter๋ผ๋ shared mutable variable๋ฅผ ์ฆ๊ฐ์ํค๋ ๊ฐ๋จํ action์ ํ๋ผ๋ฏธํฐ๋ก ๋ฃ์ด์ ๋๋ ค๋ณด์. ์ฌ๊ธฐ์๋ Dispatcher๋ก Dispatcher.Default๋ฅผ ์ฌ์ฉํ๋ค.
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
์์ ์ฝ๋๋ฅผ ๋๋ ธ์ ๋ ์ด๋ค ๊ฐ์ด ์ถ๋ ฅ๋ ๊น??๐ฏ ๋จ์ํ๊ฒ ์๊ฐํ๋ฉด ์๋์ ๊ฐ์ ๊ฐ๋ค์ด ์ถ๋ ฅ๋ ๊ฒ ๊ฐ๋ค.
// ๋จ์ํ ์๊ฐํ์ ๋์ ์ถ๋ ฅ๊ฐ
Completed 100000 actions in 23 ms
Counter = 100000
ํ์ง๋ง, ์ค์ ๋ก๋ Counter์ ๊ฐ์ด ์์๋ ์ ํ ๋ค๋ฅธ ๊ฐ์ด ๋์ฌ ๊ฒ์ด๋ค!! ์๋ํ๋ฉด 100๊ฐ์ ์ฝ๋ฃจํด์ด counter
๋ผ๋ ๋ณ์๋ฅผ synchronization ํ์ง ์๊ณ ๋ฉํฐ์ค๋ ๋๋ก ๋์์ ์ฆ๊ฐ์์ผฐ๊ธฐ ๋๋ฌธ์ด๋ค.
๋ฐ๋ผ์ ์ค์ ์ถ๋ ฅ๊ฐ์ ์๋์ ๊ฐ์ด ๋์ค๊ฒ ๋๋ค.
Completed 100000 actions in 23 ms
Counter = 85519
โก๏ธ Volatiles๋ก๋ ํด๊ฒฐ์ด ์๋ ๊น..?๐ค
๊ทธ๋ ๋ค๋ฉด ํน์ volatile
๋ณ์๊ฐ concurrency ๋ฌธ์ ๋ฅผ ํด๊ฒฐํด์ฃผ์ง ์์๊น..?
์์ ์ฝ๋์ counter ๋ณ์์ Volatile
์ด๋
ธํ
์ด์
์ ๋ถ์ฌ์ ๋ค์ ์ฝ๋๋ฅผ ๋๋ ค๋ณด์!!
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
์ฝ๋๊ฐ ์์ ๋ณด๋ค ๋ ๋๋ฆฌ๊ฒ ๋์๊ฐ๊ธฐ๋ ํ์ง๋ง, ์ฌ์ ํ Counter๊ฐ 100000์ผ๋ก ์ฐํ์ง๋ ์๋๋ค. ์๋ํ๋ฉด volatile ๋ณ์๊ฐ ํด๋น ๋ณ์์ linearizable(=atomic)ํ read & write๋ฅผ ๋ณด์ฅํด์ฃผ๊ธด ํ์ง๋ง, ๋ ํฐ action๋ค์๊ฒ atomocity๋ฅผ ์ ๊ณตํ์ง ์๊ธฐ ๋๋ฌธ์ด๋ค.
๐ Thread-safeํ ์๋ฃ๊ตฌ์กฐ๋ค
thread-safeํ ์๋ฃ๊ตฌ์กฐ๋ฅผ ์ฌ์ฉํ๋ค๋ฉด (1)์ค๋ ๋์ (2)์ฝ๋ฃจํด์ ํด๊ฒฐ์ฑ ์ด ๋ ์ ์๋ค. thread-safeํ ์๋ฃ๊ตฌ์กฐ๋ shared state๋ฅผ ์ด์ฉํ๋ ์ฐ์ฐ์ ์ํ synchronization ๊ธฐ๋ฅ์ ์ ๊ณตํด์ฃผ๋ฉฐ, synchronized, linearizable, atomic์ด ์ด์ ํด๋นํ๋ค.
1๏ธโฃ AtomicInteger
๊ฐ๋จํ counter์ผ ๊ฒฝ์ฐ์๋ AtomicInteger
๋ฅผ ์ธ ์ ์์ผ๋ฉฐ, AtomicInteger
๋ incrementAndGet()
๋ฉ์๋๋ฅผ ๊ฐ์ง๋ค.
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
Completed 100000 actions in 26 ms
Counter = 100000
AmomicInteger๋ฅผ ์ฐ๋ฉด ์์์ Volatile ๋ณ์๋ฅผ ์ผ์ ๋์๋ ๋ฌ๋ฆฌ counter๊ฐ 100000๊ฐ ์ ํํ ์ฐํ๋ ๊ฒ์ ๋ณผ ์ ์๋. AtomicInteger๋ ๋ณดํต์ counter์ collection, queue ๊ทธ๋ฆฌ๊ณ ๋ค๋ฅธ ํ์ค ์๋ฃ๊ตฌ์กฐ๋ ๊ธฐ๋ณธ ์ฐ์ฐ์์ ๋๊ธฐํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํด์ค๋ค.
๊ทธ๋ฌ๋, thread-safe๋ฅผ ์ํ implementation์ด ๋์ง์์ ๋ณต์กํ state๋ ๋ณต์กํ ์ฐ์ฐ์ ๊ณ์ฐํ๋ ๊ฒ์ ์ด๋ ค์ด ์ผ์ด๋ค.
2๏ธโฃ Thread confinement fine-grained
Thread confinement, ์ฆ ์ค๋ ๋๋ฅผ ์ ํํ๋ ๋ฐฉ๋ฒ์ ์ฐ๋ฉด shared mutable state์ ๋๊ธฐํ ๋ฌธ์ ์ ๋๋ค๋ฅธ ํด๊ฒฐ์ฑ ์ด ๋ ์ ์๋ค. ์ด ๋ฐฉ์์ ํน์ shared state๋ก ์ ๊ทผํ๋ ๋ชจ๋ ๊ฒ๋ค์ด ๋จ์ผ ์ค๋ ๋๋ฅผ ์ด์ฉํ๊ฒ ํ๋ ๊ฒ์ด๋ค. ์ฃผ๋ก UI ์ดํ๋ฆฌ์บ์ด์ ์์ ์ฌ์ฉ๋๋๋ฐ, ์ฌ๊ธฐ์ ๋ชจ๋ UI state๋ single event-dispatch/application ์ค๋ ๋๋ก ์ ํ๋๋ค. ์ฝ๋ฃจํด์์ single-thread context ์ฌ์ฉํจ์ผ๋ก์จ ์ ์ฉํ ์ ์๋ค.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// confine each increment to a single-threaded context
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
Completed 100000 actions in 2190 ms
Counter = 100000
์์ ์ฝ๋๋ ๊ฐ๊ฐ์ increment๋ฅผ single-thread๋ก ์ ํํ๋ fine-grained thread-confinement๋ฅผ ์ํํ๊ธฐ ๋๋ฌธ์ ๋งค์ฐ ๋๋ฆฌ๊ฒ ๋์ํ๋ค. ๊ฐ๊ฐ์ ++์ withContext ๋ธ๋ก์ ์ด์ฉํ์ฌ multi-threaded Dispatchers.Default ์ปจํ ์คํธ์์ single-threaded ์ปจํ ์คํธ๋ก ์ ํ์ํจ๋ค.
3๏ธโฃ Thread confinement coarse-grained
thread-confinement๋ ์๋ฅผ ๋ค์ด์, state๋ฅผ ์ ๋ฐ์ดํธ ์ํค๋ ๋น์ฆ๋์ค ๋ก์ง์ ์ฑ๊ธ ์ค๋ ๋๋ก ์ ํ์ํค๋ ๊ฒฝ์ฐ์ ๊ฐ์ด ํฐ ๋ฉ์ด๋ฆฌ์์ ์ฌ์ฉ๋๋ค. ์๋์ ์์ ์์๋ single thread ์ปจํ ์คํธ์์ ๊ฐ ์ฝ๋ฃจํด์ ์คํํ๋ฉด์ ์์ํ๋ค.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// confine everything to a single-threaded context
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
์์ ์ฝ๋๋ ์์์์ increment๋ง๋ค single-thread๋ก ์ ํํ๋ fineํ ๊ฒฝ์ฐ์๋ ๋ฌ๋ฆฌ, coarseํ๊ฒ massiveRun ๋ญํ ์ด๋ฅผ single-thread๋ก ์ ํํจ์ผ๋ก์จ ๋ ๋น ๋ฅด๊ฒ ๋์ํ๊ณ ์ ํํ ๊ฒฐ๊ณผ๋ฅผ ๋ด๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
4๏ธโฃ Mutual exclusion
์ํธ ๋ฐฐ์ ๋ ๋์์ ์ ๊ทผํด์๋ ์ ๋๋ก ์๋๋ ์๊ณ์์ญ์์ shared state๋ฅผ ์์ ํ ๋ ์ธ ์ ์๋ ๋ฐฉ๋ฒ์ด๋ค. blocking์ ํ ๋ ๊ฐ๋ฐ์๋ค์ ๋ณดํต synchronized
๋ ReentrantLock
๋ฅผ ์ด์ฉํ๋ค. ์ฝ๋ฃจํด์ ๋ค๋ฅธ ํด๊ฒฐ์ฑ
์ผ๋ก๋ Mutex๊ฐ ์๋ค. Mutex๋ ์๊ณ์์ญ์ ์ํ๋ฅผ ๊ตฌ๋ถํ๊ธฐ ์ํด์ lock ๋ฉ์๋์ unlock ๋ฉ์๋๋ฅผ ๊ฐ์ง๊ณ ์๋ค. Mutex.lock()๋ฉ์๋๋ suspending ํจ์์ด๋ฉฐ thread๋ฅผ blockํ์ง ์๋๋ค๋ ํน์ง์ด ์๋ค.
withLock์ด๋ผ๋ ํ์ฅํจ์๋ฅผ ์ฌ์ฉํ๋ฉด mutex.lock(); try { ... } finally { mutex.unlock() }
ํจํด์ ๊ฐ๋จํ ์ ์ฉํ ์ ์๋ค.
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// protect each increment with lock
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
์์ ์์ ๋ fine-grainedํ locking ์์ ๋ผ์ ๋น์ฉ์ด ๋ง์ด ๋ ๋ค. ๊ทธ๋ฌ๋, ์ฃผ๊ธฐ์ ์ผ๋ก shared state๋ฅผ ๋ณ๊ฒฝํด์ผ ํ์ง๋ง ํด๋น state๋ฅผ confine์ํฌ ์ค๋ ๋๊ฐ ๊ธฐ์กด์ ์ค๋ ๋ ํ์ ๋ง๋ ํ ์๋ ๊ฒฝ์ฐ์๋ ์ข์ ํด๊ฒฐ์ฑ ์ด ๋ ์ ์๋ค.
5๏ธโฃ Actors
actor๋ state(์ฝ๋ฃจํด์ confine๋๊ณ ์บก์ํ๋๋)์ channel(๋ค๋ฅธ ์ฝ๋ฃจํด๋ค๊ณผ ์ปค๋ฎค๋์ผ์ด์
ํ๋)๋ก ์ด๋ฃจ์ด์ง ์ฝ๋ฃจํด entity์ด๋ค. ๊ฐ๋จํ actor๋ ๊ทธ๋ฅ ํจ์๋ก ์ธ ์ ์์ง๋ง, ๋ณต์กํ state๋ฅผ ๊ฐ์ง actor๋ ํด๋์ค๋ก ์์ฑํ๋ ๊ฒ์ด ๋ ์ ํฉํ๋ค.
actor์ ์ฝ๋ฃจํด ๋น๋๋ ์กํฐ์ ๋ํ ๋จ์ผ ์ฐธ์กฐ ํธ๋ค๋ก ์ฌ์ฉํ ์ ์๊ฒ ํ๊ธฐ ์ํด์ (1) ๋ฉ์์ง๋ฅผ ์์ ํ๊ธฐ ์ํด์ actor์ mailbox channel์ scope์ ๊ฒฐํฉํ๋ค. ๊ทธ๋ฆฌ๊ณ (2) ์ ์ก ์ฑ๋์ ๊ฒฐ๊ณผ job object์ ๊ฒฐํฉํ๋ค.
actor๋ฅผ ์ฐ๊ธฐ ์ํด ์ฒซ ๋ฒ์งธ๋ก ํ ์ผ์ actor๊ฐ ์ฒ๋ฆฌํ ๋ฉ์์ง ํด๋์ค๋ฅผ ์ ์ํ๋ ๊ฒ์ด๋ค. ์ฝํ๋ฆฐ์ sealed class๊ฐ ์ด๋ฌํ ๋ฉ์์ง ํด๋์ค๋ฅผ ์ ์ํ ๋ ์ ํฉํ๋ค. ๋ฐ์ ์์ ์์๋ CounterMsg sealed class๋ฅผ ์ด์ฉํ์ฌ IncCounter ๋ฉ์์ง์ GetCounter ๋ฉ์์ง๋ฅผ ์ ์ํ์๋ค. IncCountr ๋ฉ์์ง๋ counter๋ฅผ ์ฆ๊ฐ์ํจ๋ค. ๊ทธ๋ฆฌ๊ณ GetCounter๋ ๊ฐ์ getํด์ค๋ค. GetCounter๋ response๋ฅผ ๋ณด๋ด์ผ ํ๋ฏ๋ก ํ๋ผ๋ฏธํฐ๋ก ๋ฌ๋ ค์๋ค. CompletedDeffered
(single value๋ฅผ ๋ํ๋)๊ฐ response๋ก ์ฌ์ฉ๋๊ฒ ๋๋ค.
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
๊ทธ๋ฌ๊ณ ๋์, actor ์ฝ๋ฃจํด ๋น๋๋ฅผ ์ฌ์ฉํ์ฌ actor๋ฅผ launchํ๋ ํจ์๋ฅผ ์ ์ํ๋ค.
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
์ด๋ฅผ ์ด์ฉํ์ฌ ๋ฉ์ธ ํจ์๋ฅผ ์์ฑํด๋ณด์.
fun main() = runBlocking<Unit> {
val counter = counterActor() // create the actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
Completed 100000 actions in 1016 ms
Counter = 100000
์ด๋ค context์์ acter๊ฐ ์ํ๋๋๋๋ ์ค์ํ์ง ์๋ค. actor ์์ฒด๊ฐ ์ฝ๋ฃจํด์ด๊ณ , ์ฝ๋ฃจํด์ ์์ฐจ์ ์ผ๋ก ์คํ๋์ด์ ํน์ ์ฝ๋ฃจํด์ผ๋ก state๋ฅผ ํ์ ํ๋ ๊ฒ์ shared mutable state์ ๋ฌธ์ ์ ์ ๋ํ ํด๊ฒฐ์ฑ
์ด ๋ ์ ์๋ค. actor๋ private state๋ฅผ ๋ณ๊ฒฝ์ํฌ ์ง๋ ๋ชจ๋ฅด์ง๋ง, lockํ ํ์ ์์ด message๋ฅผ ํตํด์ ์๋ก ์ํฅ์ ๋ผ์น ๋ฟ์ด๋ค.
์์ ์ํฉ์์๋ Actor๊ฐ ํญ์ ์์
ํ ๊ฒ์ด ์๊ณ ๋ค๋ฅธ context๋ก ์ ํ์ํฌ ํ์๊ฐ ์๊ธฐ ๋๋ฌธ์, Actor๊ฐ load ํ์ locking์ ์ํํ๋ ๊ฒ๋ณด๋ค ๋ ํจ์จ์ ์ธ ๋ฐฉ๋ฒ์ด๋ค.
[์ฐธ๊ณ ์ฌ์ดํธ]
Kotlin ๊ณต์ ๋ฌธ์ - Shared mutable state and concurrency