9 minute read

해당 아티클은 다음 수준의 지식을 요구합니다.

  • 고루틴을 사용해본 경험
  • golang 채널 사용 방법

다음은 다루지 않습니다.

  • select 문을 활용하는 방법

다음 실행환경에서 동작합니다.

  • wsl2 Ubuntu-22.04
  • go1.22.2 linux/amd64

이번에는 atomic 이외에도 가장 기본적이고 중요한 synchronization primitive인 mutex, rwmutex, waitgroup에 대해서 알아보겠습니다. 다른 언어에도 많이 사용되는 개념인 mutex, rwmutex는 golang 내부에서 어떻게 구현되어 있을까요? 세 가지 구조체는 내부가 거의 비슷하고 이해하기 어렵지 않게(state 변화를 무시하면…) 구현되어 있습니다. 확인해봅시다! 아마 synchronization primitive를 거의 다 다룬 것 같고 sync.Cond는 직접 확인해도 어렵지 않아서 당분간은 다루지 않을 것 같습니다.


futex

이번 아티클에서 확인할 primitive는 runtime_SemacquireXXX 또는 runtime_Semrelease 함수를 이용하게 됩니다.

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/runtime.go

해당 함수는 런타임 패키지에서 구현되며, 해당 구현은 운영체제마다 달라집니다. 여기서는 리눅스 환경을 가정하므로 리눅스의 락에 대한 시스템 콜인 futex를 이용하게 됩니다.

리눅스의 경우에 대한 구현은 해당 라인부터 확인할 수 있습니다.

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/sema.go#L60

위의 SemacquireXXX는 아래의 sync_runtime_SemacquireXXX로 치환되고, Semrelease는 아래의 sync_runtime_Semrelease로 치환됩니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/sema.go#L60
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
// semacquireXXX의 경우
func sync_runtime_Semacquire(addr *uint32) {
	semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
// semrelease의 경우
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
	semrelease1(addr, handoff, skipframes)
}

이때 semacquire1은 futex의 wait를 호출하고 semrelease1은 futex의 wake를 호출하여 각각 대기, 깨움의 역할을 한다고 이해하면 됩니다. 다시 단순하게 정리해봅시다.

  • SemacquireXXX: 시스템 콜 futex wait로 대기
  • Semrelease: 시스템 콜 futex wake로 깨움

sync.Mutex의 동작 원리

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/mutex.go#L34

먼저 가장 단순한 sync.Mutex부터 시작해봅니다. sync.Mutex의 필드는 굉장히 단순합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/mutex.go#L34
type Mutex struct {
	state int32
	sema  uint32
}

앞으로 보게 될 자료형도 위처럼 state와 sema를 기본적으로 가지고 있습니다. state는 mutex의 상태를 나타내기 위한 값으로 0인 값은 기본 상태, 그 외는 어떤 특별한 상태를 기술하게 됩니다. 한편 sema는 0이 아닌 상태를 가질 때 대기 중인 고루틴을 깨우는 역할을 수행합니다.

이번에는 mutex에서 많이 사용하는 메서드인 Lock과 Unlock에 대해서 살펴봅시다. 먼저 Lock입니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/mutex.go#L81
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 만약 바로 락을 획득할 수 있으면
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow() // 바로 락을 획득할 수 없으면 futex
}
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/mutex.go#L117
func (m *Mutex) lockSlow() {
	// 함수 초기화
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// ...
		// atomic CAS로 상태 비교
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 해당 부분에서 futex call 호출
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// 상태값 갱신 후 대기 탈출
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			// 비교 실패 시 대기
			old = m.state
		}
	}
	// ...
}

Lock 함수를 보면 크게 두 부분으로 이루어져 있습니다.

  1. 유저 모드에서 atomic 연산으로 바로 락을 획득할 수 있는 경우(기본 상태인 경우) 락 획득
  2. futex call로 락 획득 대기. 획득된 경우 mutex의 상태값을 갱신 후 함수 탈출

앞으로 함께 보게 될 primitive인 RWMutex, WaitGroup 또한 비슷하게 동작합니다.

  1. atomic swapping
  2. futex wait, wake

이렇게 atomic을 이용하여 구현된 이유는, 바로 락을 획득할 수 있는 경우 굳이 시스템 콜인 futex wait/wake를 이용하지 않아도 되기 때문입니다. 따라서 최대한 애플리케이션 레벨에서 동시성 처리를 보장하도록 구현됐다고 보면 좋을 것 같습니다.

이번에는 Unlock입니다. Unlock도 마찬가지로 atomic swapping 진행 후 futex wake를 호출합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/mutex.go#L212
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	// ...
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/mutex.go#L227
func (m *Mutex) unlockSlow(new int32) {
	// ...
	// 마찬가지로 futex로 대기중인 Lock 깨움
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}


sync.RWMutex의 동작 원리

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/rwmutex.go#L35

다음은 RWMutex입니다. 해당 구조체의 필드는 writer와 reader를 따로 관리하므로 writer에 대한 대기를 관리할 writerSem, reader에 대한 대기를 관리할 readerSem이 존재합니다. 나머지 필드는 메서드를 확인하면서 알아봅시다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/rwmutex.go#L35
type RWMutex struct {
	w           Mutex        // held if there are pending writers
	writerSem   uint32       // semaphore for writers to wait for completing readers
	readerSem   uint32       // semaphore for readers to wait for completing writers
	readerCount atomic.Int32 // number of pending readers
	readerWait  atomic.Int32 // number of departing readers
}

먼저 RLock입니다. reader의 Lock을 획득할 때 사용합니다. RLock만 이용하여 접근하면 여러 고루틴이 동시에 데이터를 읽을 수 있습니다. RLock도 단순하게 atomic rw.readerCount를 변화시키고 만약 writer가 Lock으로 대기 또는 실행중인 경우(이 경우 readerCount가 -rwmutexMaxReaders입니다) futex 대기합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/rwmutex.go#L63
func (rw *RWMutex) RLock() {
	// ...
	// atomic add 시도 후 add결과가 음수면, 즉 writer가 대기중이면
	// readerSem에 대한 futex wait
	if rw.readerCount.Add(1) < 0 { 
		// A writer is pending, wait for it.
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
	// ...
}

다음은 RUnlock입니다. readerCount를 줄입니다. 이때 음수인 경우, rUnlockSlow로직을 실행합니다. 내부에서는 실제 대기하는 reader 개수를 rw.readerCount로부터 가져오고 0에 도달하면 writerSem을 wake하여 writer에 대한 Lock이 활성화될 수 있도록 합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/rwmutex.go#L110
func (rw *RWMutex) RUnlock() {
	// ... 
	// atomic add 시도 후 add 결과가 음수면, 즉 writer가 대기중이면
	// 리더 수를 검사하고 0일 때 writerSem에 대한 futex wake
	if r := rw.readerCount.Add(-1); r < 0 { 
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	// ...
}

Lock입니다. writer의 Lock을 획득할 때 해당 함수를 호출하여 RLock을 방지하고 reader의 RUnlock을 대기합니다.

  • 다른 writer으로부터 중복 접근을 방지하기 위해 w.Lock을 호출
  • reader 수 계산
  • RLock으로 진입한 reader수가 0이 아니라면 writerSem에 대한 락 획득 대기
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/rwmutex.go#L140
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
	// ...
	// 다른 writer와 경합에 대해서 Lock획득
	rw.w.Lock()
	// maxReader만큼 rw.readerCount를 줄임
	// r은 그대로 reader 수
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	// 활성화 reader에 대한 futex 대기
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
	// ...
}

따라서 만약 3개의 RLock이 먼저 발생하고 Lock호출, 3개의 RUnlock이 호출되면 다음 과정을 따르게 됩니다.

  1. readerCount를 3으로 증가
  2. rw.w.Lock을 호출하여 다른 writer 진입 방지
  3. reader를 음수로 만들어 reader 진입 방지
  4. 그러나 Lock 내부의 r은 3이므로 rw.writerSem 대기
  5. RLock을 획득한 고루틴에서 RUnlock시 rUnlockSlow 진입: 내부에서 RLock을 획득한 reader수 -1: 따라서 남은 reader 2개
  6. RLock을 획득한 고루틴에서 RUnlock시 rUnlockSlow 진입: 내부에서 RLock을 획득한 reader수 -1: 따라서 남은 reader 1개
  7. RLock을 획득한 고루틴에서 RUnlock시 rUnlockSlow 진입: 내부에서 RLock을 획득한 reader수 -1: 따라서 남은 reader 0개: writerSem release로 대기중인 Lock의 futex를 이용하여 writerSem 활성화

다음은 Unlock입니다. rwmutexMaxReaders만큼 빼준 값을 복구하고 대기 중인 모든 RLock을 깨워 락을 획득할 수 있게 합니다. rw.w.Unlock을 호출하여 다른 writer 고루틴이 접근할 수 있도록 합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/rwmutex.go#L197
func (rw *RWMutex) Unlock() {
	// ...
	// Announce to readers there is no active writer.
	
	// rw.readerCount 복구
	r := rw.readerCount.Add(rwmutexMaxReaders)
	// ...
	// 모든 RLock을 획득하려고 하는 고루틴 활성화
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 다른 writer 깨우기
	rw.w.Unlock()
	// ...
}

sync.WaitGroup의 동작 원리

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/waitgroup.go#L23

WaitGroup의 내부 필드는 다음과 같습니다. 무언가 굉장히 없습니다. 마찬가지로 내부 상태 state와 고루틴의 대기를 이용하기 위한 sema로 이루어집니다. 이때 state는 두 부분으로 나눠지는데 상위 32비트는 Add로 추가된 값의 총 합, 하위 32비트는 WaitGroup.Wait로 대기하는 waiter의 개수를 의미합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/waitgroup.go#L23
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

먼저 Add입니다. state의 상위 32비트에 delta를 더하고 여태까지 더한 delta값을 v에 저장하고 waiter 고루틴의 개수를 w로 저장합니다, waiter가 없거나 delta가 0보다 크면 함수를 종료합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/waitgroup.go#L43
func (wg *WaitGroup) Add(delta int) {
	// ...
	state := wg.state.Add(uint64(delta) << 32) // state 상위 32비트에 delta를 더해서 넣어줌
	v := int32(state >> 32) // 상위 32비트는 여태까지 더한 delta의 합
	w := uint32(state) // wg.Wait()로 대기중인 고루틴의 수
	// ...
	if v > 0 || w == 0 { // 만약 대기 중인 고루틴이 없거나, delta가 0보다 크면 함수 종료
		return
	}
	// ...
	// Reset waiters count to 0. 
	// delta가 0이고 w가 0이 아닌 경우
	// state를 초기화하고 대기중인 고루틴 수 만큼 for루프가 돌면서 대기 중인 고루틴을 깨움
	wg.state.Store(0)
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0) // wg.sema를 양수로 바꿔 대기중인 고루틴 트리거
	}
}

한편 Done의 경우는 다음과 같습니다. delta를 -1로 하여 Add를 호출합니다. 만약 상위 32비트의 값이 0이라면 어떻게 될까요? 이때는 대기하는 waiter를 깨우기 위해 waiter 수 만큼 for 루프를 돌아 Semrelease로 futex wake를 실행합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/waitgroup.go#L86
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait의 경우입니다. 필드를 가져오고 wg의 CAS로 state를 증가시킵니다. 성공 시 Semacquire로 futex wait을 실행합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/sync/waitgroup.go#L91
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	// ...
	for {
		state := wg.state.Load()
		v := int32(state >> 32) // 상위 32비트는 여태까지 더한 delta의 합
		w := uint32(state) // wg.Wait()로 대기중인 고루틴의 수
		if v == 0 { // 이미 delta의 총합이 0이면 대기할 필요가 없음
			// ... 
			return
		}
		// 대기가 필요한 경우 대기중인 고루틴 수를 증가시키고 대기
		if wg.state.CompareAndSwap(state, state+1) {
			// ...
			runtime_Semacquire(&wg.sema) // wg.sema > 0 인 조건을 대기
			// ...
			return
		}
	}
}


지금까지 golang의 비동기 동작 원리와 관련된 sync.Mutex, RWMutex, WaitGroup의 구현 방식을 다뤄봤습니다.

동작은 약간 차이가 있지만 대체로 위에서 설명한 다음 과정을 따르게 됩니다.

  1. atomic swapping
  2. futex wait, wake

위 과정을 따르는 이유는 애플리케이션 레벨에서 동시성 처리를 조금 더 보장하여 시스템 콜을 줄이고 성능을 올리기 위함입니다.

각 세부 동작을 간추리면, sync.Mutex는 atomic 연산과 futex 시스템 콜을 통해 락을 관리하며, RWMutex는 reader와 writer의 대기를 별도로 관리합니다. WaitGroup은 상위 32비트로 작업 카운터를, 하위 32비트로 대기 중인 고루틴 수를 관리하여 동기화를 처리합니다

참고 자료

futex 사용법 - Google Search

futex(fast user mutex)

Leave a comment