golang 비동기. synchronization primitive(2). channel 송수신의 동작 원리
해당 아티클은 다음 수준의 지식을 요구합니다.
- 고루틴을 사용해본 경험
- golang 채널 사용 방법
다음은 다루지 않습니다.
- golang 채널을 활용하는 방법
다음 실행환경에서 동작합니다.
- wsl2 Ubuntu-22.04
- go1.22.2 linux/amd64
이번에는 이벤트 루프를 구현하기 전에 golang의 가장 핵심적인 요소인 채널의 동작 원리를 학습해보겠습니다. 원래는 atomic을 다루고 싶었는데, atomic에서 다룰만한 주제가 많지 않은 것 같고 좋은 자료가 더 많기 때문에 atomic 관련된 부분은 스킵하고 바로 채널이라는 주제로 들어가보겠습니다. 채널의 내부 구현도 재밌는 부분이 있어서 같이 확인해봅시다!
golang 채널 소개 & 필드
golang 채널은 고루틴 간의 데이터를 전달하기 위한 자료구조입니다. 내부는 원형 큐로 이루어져 있습니다. 보통 고루틴과 같이 논리적, 또는 물리적으로 구현된 비동기를 처리하는 대상 간 통신은 내부에서 큐를 활용합니다. golang에서 원형 큐를 쓰는 이유는 공식적인 자료를 확인한 것은 아니지만 원형 큐를 이용하여 캐시 친화적으로 동작하게 하기 위함이 아닐까 생각합니다. 왜냐하면 큐에 여러 데이터가 들어올 때 원형 큐의 경우, 큐에서 조회된 메모리 근방에 다음 원소가 존재하기 때문입니다. 이런 부분에서 채널의 send, recv 연산의 성능을 올리지 않았을까 추측해봅니다.
golang 채널의 필드는 다음과 같이 생겼습니다. 해당 링크에서 직접 소스 코드를 확인해보실 수 있습니다.
https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L33
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L33
type hchan struct {
qcount uint // 버퍼 내 데이터 수
dataqsiz uint // make(chan T, N)에서 N을 의미
buf unsafe.Pointer
elemsize uint16 // sizeof(T)
closed uint32 // channel close state
elemtype *_type
sendx uint // buffer index
recvx uint // buffer index
recvq waitq // waiting queue: 송신 대기 큐
sendq waitq // waiting queue: 수신 대기 큐
lock mutex
}
type waitq struct { // 고루틴에 대한 큐
first *sudog
last *sudog
}
type sudog struct { // 채널에서 대기 중인 고루틴을 표현하는 구조체
g *g
isSelect bool
next *sudog // 연결리스트용 값
prev *sudog
elem unsafe.Pointer // 송수신 대기 값
c *hchan // 송수신 대기 채널
}
크게 특이한 점은 없습니다. ch := make(chan struct{}, 10) 과 같은 코드가 있다면, 주어진 정보는 채널의 타입, 크기가 주어집니다. 따라서 dataqsiz와 elemsize, elemtype은 우선 필수적인 필드입니다.
한편, 채널은 원형 큐이므로 원소에 대한 입출력이 존재하고 큐에 얼마만큼의 데이터가 남아있는지를 나타내는 qcount라는 필드가 존재합니다. 마찬가지로 sendx, recvx도 원형 큐에 데이터의 위치를 표현하기 위해 필요한 변수입니다.
채널은 close(ch) 과 같은 연산이 가능하기 때문에 closed 필드도 가지고 있습니다.
그리고 구독, 발행하는 고루틴을 큐로 소유하는 recvq, sendq, 잠금을 위한 뮤텍스의 필드를 가집니다.
buf는 버퍼 채널에서 버퍼 공간을 의미하는 필드입니다.
이번에는 채널에 송수신하는 코드를 직접 살펴봅시다. 해당 코드는 서로 굉장히 유사하며 대칭적인 구조를 살펴볼 수 있습니다. 시작하기 전에 아래 부분을 기억해두면 굉장히 좋습니다.
채널 원소 송신 시: sendx += 1
채널 원소 수신 시: recvx += 1
send가 일어나는 경우: ch ← x
채널에 메세지를 전달했을 때 코드는 여기서 확인할 수 있습니다. runtime/chansend1에서 runtime/chansend 함수가 호출됩니다.
https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L160
해당 함수를 따라가며 분석해보겠습니다. 먼저 nil 채널에 전달하면 에러 처리를 진행하고 고루틴의 작동을 중단합니다.
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L161
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
닫힌 채널로 전달하면, 즉 이전에 close(ch)가 호출된 경우 패닉이 발생합니다.
func chansend(
// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L204
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// ...
}
채널이 데이터를 수신받기를 기다리는 경우입니다. 이 경우에 recvq라는 채널의 송신 대기 큐에서 대기 중인 고루틴을 가져옵니다. 이 부분이 나타내는 의미는 다음과 같습니다.
- 채널에서 원소를 꺼내가려는 고루틴이 존재합니다. 따라서 recvq에 고루틴이 존재합니다.
- recvq에 고루틴이 대기하고 있다는 의미는 채널에 원소가 없어 채널이 데이터를 수신받기를 기다리고 있다는 말입니다. 채널 관점에서 recvq는 채널이 송신받기를 기다리는 큐이므로 송신 대기 큐입니다.
func chansend(
// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L209
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
send 함수 내부를 살펴봅시다. send 함수를 보면 고루틴에 데이터를 송신하고 recvq에 대기하고 있던 sudog에서 고루틴을 가져와서 goready를 통해 상태를 스케줄러가 실행할 수 있는 상태로 변경하는 것을 확인할 수 있습니다.
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L294
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//...
if sg.elem != nil { // 수신자의 목적지가 유효하다면(아닌 경우는 <- ch 이렇게 응답값을 사용하지 않는 경우입니다)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
위의 버퍼가 있는 채널에서 send시 recv하는 고루틴이 블록된 경우를 그림으로 살펴보면 다음과 같습니다.

이어서 채널 버퍼에 원소를 추가하는 경우입니다. 버퍼 채널이라면 아래 로직을 검증합니다. qcount와 dataqsiz를 비교하는데, 큐에 존재하는 원소의 개수가 데이터 큐 사이즈보다 작을 때 채널의 sendx에 따라 send위치를 가져오고 ep값, 즉 데이터를 qp로 가져오고 sendx를 증가시킵니다. 만약 sendx가 dataqsiz와 같아졌다면 sendx를 초기화합니다.
func chansend(
// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L216
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 메모리 위치 가져오기
// ...
typedmemmove(c.elemtype, qp, ep) // 메모리 위치에 데이터 복사
c.sendx++ // sendx 플래그값 증가
if c.sendx == c.dataqsiz { // 만약 dataqsize와 같다면 0으로 초기화
c.sendx = 0
}
c.qcount++ // 큐의 원소 개수 늘림
unlock(&c.lock)
return true
}

해당 분기를 타지 못한 경우는 어떤 경우일까요? 바로 채널의 qcount가 dataqsiz와 같을 때입니다. 이 경우는 고루틴이 대기 로직을 타야 합니다. 해당 로직은 아래 코드와 같이 동작합니다.
- sudog를 만들고
- 채널의 sendq에 넣고
- 채널에 원소를 넣는 고루틴 상태 대기로 변경 및 대기
- 깨우고 sudog 제거합니다.
func chansend(
// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L237
// Block on the channel. Some receiver will complete our operation for us.
gp := getg() // 고루틴을 가져옵니다.
mysg := acquireSudog() // 큐에서 대기를 위해 sudog 구조체를 생성합니다.
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 필드 초기화
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // <- 여기서 send queue에 대기시켜두고
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 고루틴의 상태를 대기로 변경합니다.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // <- goroutine을 현재 waiting 상태에 두고 run queue에서 제거
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// ...
// 이후 깨면 필드 재설정 후 sudog를 제거합니다.
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
// ...
return true

recv가 일어나는 경우 x := ← ch
채널에서 메세지를 받을 때 코드는 여기서 확인할 수 있습니다. runtime/chanrecv1에서 runtime/chanrecv 함수가 호출됩니다.
https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L457
해당 함수를 따라가며 분석해보겠습니다. 먼저 nil 채널에 전달하면 에러 처리를 진행하고 고루틴의 작동을 중단합니다. 밑의 !block && empty(c)절은 당장은 무시합니다. 해당 코드는 나중에 확인해보겠습니다.
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L465
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && empty(c) {
// ...
}
닫힌 채널이면 qcount가 0일 때 수신자의 메모리를 초기화합니다. 채널이 열려 있는 경우에는 sendq에서 대기 중인 고루틴이 있는지 확인합니다. 채널이 데이터를 수신해가기를 기다리는 경우입니다. 이 경우에 sendq라는 채널의 수신 대기 큐에서 대기 중인 고루틴을 가져옵니다. 이 부분이 나타내는 의미는 다음과 같습니다.
- 채널에 원소를 넣으려는 고루틴이 존재합니다. 따라서 sendq에 고루틴이 존재합니다.
- sendq에 고루틴이 대기하고 있다는 의미는 채널이 꽉 차서 채널이 데이터를 수신해가기를 기다리고 있다는 말입니다. 채널 관점에서 sendq는 채널이 수신되기를 기다리는 큐이므로 수신 대기 큐입니다.
func chanrecv(
// ...
) (selected, received bool) {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L513
if c.closed != 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// ...
} else {
// ...
if sg := c.sendq.dequeue(); sg != nil {
// ...
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
recv 함수 내부를 살펴봅시다. recv 함수 내부는 send보다는 복잡하게 처리합니다. 좀 복잡해서 주석으로 동작을 명시했습니다. 요약해보면 다음과 같습니다.
- 데이터 큐 사이즈가 0이면 바로 직접 전달합니다.
- 그 외는 꽉 찬 경우입니다. 채널 내부의 데이터에서 큐 맨 앞의 데이터를 송신받을 고루틴에게 전달합니다.
- 그럼 받은 위치는 비어지게 됩니다. 해당 위치에 sendq에서 나온 고루틴이 전달할 데이터를 적재합니다. 그럼 해당 위치는 큐의 마지막 인덱스가 됩니다.
- recvx, sendx의 값을 증가시키고 회전합니다(버퍼가 꽉 차있으므로 둘의 값은 같습니다).
- 남은 필드를 정리합니다.
- sendq에서 나온 고루틴을 활성 상태로 변경합니다.
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L615
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 큐 사이즈가 0이면
if raceenabled {
racesync(c, sg)
}
if ep != nil { // 바로 전달
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else { // 버퍼가 꽉 찼다면
// ...
qp := chanbuf(c, c.recvx) // <- qp는 recvx의 위치를 가리킵니다
// ...
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // <- qp의 데이터를 ep로 복사합니다. ep는 수신자가 받을 메모리 주소
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem) // <- 여기서 sg는 send를 하는 고루틴입니다. 송신자의 데이터를 qp로 복사: 꽉찼기 때문!!
// 꽉찼으니까 뺀 자리에 sendq의 고루틴이 전달하는 데이터를 바로 추가합니다.
c.recvx++ // recvx를 호출함으로 인해 sg의 데이터가 제일 마지막 순번으로 가집니다!!
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
위의 버퍼가 있는 채널에서 recv시 send하는 고루틴이 블록된 경우를 그림으로 살펴보면 다음과 같습니다.

이어서 채널 버퍼에서 원소를 꺼내는 경우입니다. 버퍼 채널이라면 다음처럼 큐에서 원소를 빼고 recv를 호출한 고루틴으로 원소를 옮기는 작업을 진행합니다.
func chanrecv(
// ...
) (selected, received bool) {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L537
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) // qp위치를 가져오고
// ...
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // qp값 ep로 복사
}
typedmemclr(c.elemtype, qp) // qp 메모리 클리어
c.recvx++ // recvx 값 증가 시킴
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount-- // 원소 개수를 지움
unlock(&c.lock)
return true, true
}

마지막은 채널에 원소가 없는 경우입니다. 이때는 대기 로직으로 돌입합니다. 해당 로직은 아래 코드와 같이 동작합니다.
- sudog를 만들고
- 채널의 recvq에 넣고
- 채널에 원소를 받는 고루틴 상태 대기로 변경 및 대기
- 깨우고 sudog 제거
func chanrecv(
// ...
) (selected, received bool) {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L561
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

채널에 대한 글은 좋은 글이 많아서 다른 글을 참고하시는 게 더 좋을지도 모릅니다. 그러나 앞으로 채널을 이용한 자료 구조와 태크닉을 정리해볼 계획이 있어서 구현 내부를 탐색해봤습니다. 위의 동작을 다 이해할 필요는 없겠지만 이 정도만 정리하면 좋을 것 같습니다.
- 채널의 구현은 원형 큐로 이루어져 있습니다. 그래서 원형 큐의 특징을 가져갑니다. (크기 제한, 캐시 친화, 추가적인 메모리 할당 X)
- 고루틴 간 데이터를 전달할 때 채널을 통해 전달되고, 송수신하는 고루틴은 채널 내부에 sudog 형태로 관리됩니다.
- 스케줄러는 고루틴이 채널에 송수신시 대기가 걸리면 고루틴의 상태를 바꿔 대기 상태로 설정하고 채널의 내부 큐에 고루틴을 링크드 리스트 형태로 저장합니다.
- 대기 중인 고루틴이 해제되면 send, recv의 동작에 따라 메모리를 복사하여 값을 전달하고 원형 큐 내부 데이터를 수정합니다.
다음에 다뤄볼 주제는 다음과 같습니다.
- select 문의 동작 과정
- golang synchroization primitive를 활용해서 뭔가(?) 만들기
참고자료
golang - 채널의 내부 구조 - jacking75
Diving Deep Into The Golang Channels.
GopherCon 2017: Filippo Valsorda - Encrypting the Internet with Go
Leave a comment