10 minute read

해당 아티클은 다음 수준의 지식을 요구합니다.

  • 고루틴을 사용해본 경험
  • golang 채널 사용 방법

다음은 다루지 않습니다.

  • golang 채널을 활용하는 방법

다음 실행환경에서 동작합니다.

  • wsl2 Ubuntu-22.04
  • go1.22.2 linux/amd64

이번에는 이벤트 루프를 구현하기 전에 golang의 가장 핵심적인 요소인 채널의 동작 원리를 학습해보겠습니다. 원래는 atomic을 다루고 싶었는데, atomic에서 다룰만한 주제가 많지 않은 것 같고 좋은 자료가 더 많기 때문에 atomic 관련된 부분은 스킵하고 바로 채널이라는 주제로 들어가보겠습니다. 채널의 내부 구현도 재밌는 부분이 있어서 같이 확인해봅시다!


golang 채널 소개 & 필드

golang 채널은 고루틴 간의 데이터를 전달하기 위한 자료구조입니다. 내부는 원형 큐로 이루어져 있습니다. 보통 고루틴과 같이 논리적, 또는 물리적으로 구현된 비동기를 처리하는 대상 간 통신은 내부에서 큐를 활용합니다. golang에서 원형 큐를 쓰는 이유는 공식적인 자료를 확인한 것은 아니지만 원형 큐를 이용하여 캐시 친화적으로 동작하게 하기 위함이 아닐까 생각합니다. 왜냐하면 큐에 여러 데이터가 들어올 때 원형 큐의 경우, 큐에서 조회된 메모리 근방에 다음 원소가 존재하기 때문입니다. 이런 부분에서 채널의 send, recv 연산의 성능을 올리지 않았을까 추측해봅니다.

golang 채널의 필드는 다음과 같이 생겼습니다. 해당 링크에서 직접 소스 코드를 확인해보실 수 있습니다.

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L33

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L33
type hchan struct {
	qcount   uint // 버퍼 내 데이터 수
	dataqsiz uint // make(chan T, N)에서 N을 의미
	buf      unsafe.Pointer
	elemsize uint16 // sizeof(T)
	closed   uint32 // channel close state
	elemtype *_type
	sendx    uint  // buffer index
	recvx    uint  // buffer index
	recvq    waitq // waiting queue: 송신 대기 큐
	sendq    waitq // waiting queue: 수신 대기 큐

	lock mutex
}

type waitq struct { // 고루틴에 대한 큐
	first *sudog
	last  *sudog
}

type sudog struct { // 채널에서 대기 중인 고루틴을 표현하는 구조체
	g *g

	isSelect bool
	next     *sudog // 연결리스트용 값
	prev     *sudog
	elem     unsafe.Pointer // 송수신 대기 값
	c        *hchan         // 송수신 대기 채널
}

크게 특이한 점은 없습니다. ch := make(chan struct{}, 10) 과 같은 코드가 있다면, 주어진 정보는 채널의 타입, 크기가 주어집니다. 따라서 dataqsiz와 elemsize, elemtype은 우선 필수적인 필드입니다.

한편, 채널은 원형 큐이므로 원소에 대한 입출력이 존재하고 큐에 얼마만큼의 데이터가 남아있는지를 나타내는 qcount라는 필드가 존재합니다. 마찬가지로 sendx, recvx도 원형 큐에 데이터의 위치를 표현하기 위해 필요한 변수입니다.

채널은 close(ch) 과 같은 연산이 가능하기 때문에 closed 필드도 가지고 있습니다.

그리고 구독, 발행하는 고루틴을 큐로 소유하는 recvq, sendq, 잠금을 위한 뮤텍스의 필드를 가집니다.

buf는 버퍼 채널에서 버퍼 공간을 의미하는 필드입니다.

이번에는 채널에 송수신하는 코드를 직접 살펴봅시다. 해당 코드는 서로 굉장히 유사하며 대칭적인 구조를 살펴볼 수 있습니다. 시작하기 전에 아래 부분을 기억해두면 굉장히 좋습니다.

채널 원소 송신 시: sendx += 1

채널 원소 수신 시: recvx += 1


send가 일어나는 경우: ch ← x

채널에 메세지를 전달했을 때 코드는 여기서 확인할 수 있습니다. runtime/chansend1에서 runtime/chansend 함수가 호출됩니다.

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L160

해당 함수를 따라가며 분석해보겠습니다. 먼저 nil 채널에 전달하면 에러 처리를 진행하고 고루틴의 작동을 중단합니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L161
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// ...
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
		throw("unreachable")
	}

닫힌 채널로 전달하면, 즉 이전에 close(ch)가 호출된 경우 패닉이 발생합니다.

func chansend(
	// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L204
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// ...
}

채널이 데이터를 수신받기를 기다리는 경우입니다. 이 경우에 recvq라는 채널의 송신 대기 큐에서 대기 중인 고루틴을 가져옵니다. 이 부분이 나타내는 의미는 다음과 같습니다.

  • 채널에서 원소를 꺼내가려는 고루틴이 존재합니다. 따라서 recvq에 고루틴이 존재합니다.
  • recvq에 고루틴이 대기하고 있다는 의미는 채널에 원소가 없어 채널이 데이터를 수신받기를 기다리고 있다는 말입니다. 채널 관점에서 recvq는 채널이 송신받기를 기다리는 큐이므로 송신 대기 큐입니다.
func chansend(
	// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L209
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

send 함수 내부를 살펴봅시다. send 함수를 보면 고루틴에 데이터를 송신하고 recvq에 대기하고 있던 sudog에서 고루틴을 가져와서 goready를 통해 상태를 스케줄러가 실행할 수 있는 상태로 변경하는 것을 확인할 수 있습니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L294
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	//...
	if sg.elem != nil { // 수신자의 목적지가 유효하다면(아닌 경우는 <- ch 이렇게 응답값을 사용하지 않는 경우입니다)
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

위의 버퍼가 있는 채널에서 send시 recv하는 고루틴이 블록된 경우를 그림으로 살펴보면 다음과 같습니다.

제목 없는 프레젠테이션.png

이어서 채널 버퍼에 원소를 추가하는 경우입니다. 버퍼 채널이라면 아래 로직을 검증합니다. qcount와 dataqsiz를 비교하는데, 큐에 존재하는 원소의 개수가 데이터 큐 사이즈보다 작을 때 채널의 sendx에 따라 send위치를 가져오고 ep값, 즉 데이터를 qp로 가져오고 sendx를 증가시킵니다. 만약 sendx가 dataqsiz와 같아졌다면 sendx를 초기화합니다.

func chansend(
	// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L216
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx) // 메모리 위치 가져오기
		// ...
		typedmemmove(c.elemtype, qp, ep) // 메모리 위치에 데이터 복사
		c.sendx++ // sendx 플래그값 증가
		if c.sendx == c.dataqsiz { // 만약 dataqsize와 같다면 0으로 초기화
			c.sendx = 0
		}
		c.qcount++ // 큐의 원소 개수 늘림
		unlock(&c.lock)
		return true
	}

제목 없는 프레젠테이션 (1).png

해당 분기를 타지 못한 경우는 어떤 경우일까요? 바로 채널의 qcount가 dataqsiz와 같을 때입니다. 이 경우는 고루틴이 대기 로직을 타야 합니다. 해당 로직은 아래 코드와 같이 동작합니다.

  1. sudog를 만들고
  2. 채널의 sendq에 넣고
  3. 채널에 원소를 넣는 고루틴 상태 대기로 변경 및 대기
  4. 깨우고 sudog 제거합니다.
func chansend(
	// ...
) bool {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L237
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()           // 고루틴을 가져옵니다.
	mysg := acquireSudog() // 큐에서 대기를 위해 sudog 구조체를 생성합니다.
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	// 필드 초기화
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg) // <- 여기서 send queue에 대기시켜두고
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	// 고루틴의 상태를 대기로 변경합니다.
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // <- goroutine을 현재 waiting 상태에 두고 run queue에서 제거
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	// ...
	
	// 이후 깨면 필드 재설정 후 sudog를 제거합니다.
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	// ...
	return true

제목 없는 프레젠테이션 (2).png


recv가 일어나는 경우 x := ← ch

채널에서 메세지를 받을 때 코드는 여기서 확인할 수 있습니다. runtime/chanrecv1에서 runtime/chanrecv 함수가 호출됩니다.

https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L457

해당 함수를 따라가며 분석해보겠습니다. 먼저 nil 채널에 전달하면 에러 처리를 진행하고 고루틴의 작동을 중단합니다. 밑의 !block && empty(c)절은 당장은 무시합니다. 해당 코드는 나중에 확인해보겠습니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L465
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// ...
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")
	}
	
	if !block && empty(c) {
	  // ...
	}

닫힌 채널이면 qcount가 0일 때 수신자의 메모리를 초기화합니다. 채널이 열려 있는 경우에는 sendq에서 대기 중인 고루틴이 있는지 확인합니다. 채널이 데이터를 수신해가기를 기다리는 경우입니다. 이 경우에 sendq라는 채널의 수신 대기 큐에서 대기 중인 고루틴을 가져옵니다. 이 부분이 나타내는 의미는 다음과 같습니다.

  • 채널에 원소를 넣으려는 고루틴이 존재합니다. 따라서 sendq에 고루틴이 존재합니다.
  • sendq에 고루틴이 대기하고 있다는 의미는 채널이 꽉 차서 채널이 데이터를 수신해가기를 기다리고 있다는 말입니다. 채널 관점에서 sendq는 채널이 수신되기를 기다리는 큐이므로 수신 대기 큐입니다.
func chanrecv(
	// ...
) (selected, received bool) {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L513
	if c.closed != 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
		// ...
	} else {
		// ...
		if sg := c.sendq.dequeue(); sg != nil {
			// ...
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

recv 함수 내부를 살펴봅시다. recv 함수 내부는 send보다는 복잡하게 처리합니다. 좀 복잡해서 주석으로 동작을 명시했습니다. 요약해보면 다음과 같습니다.

  1. 데이터 큐 사이즈가 0이면 바로 직접 전달합니다.
  2. 그 외는 꽉 찬 경우입니다. 채널 내부의 데이터에서 큐 맨 앞의 데이터를 송신받을 고루틴에게 전달합니다.
  3. 그럼 받은 위치는 비어지게 됩니다. 해당 위치에 sendq에서 나온 고루틴이 전달할 데이터를 적재합니다. 그럼 해당 위치는 큐의 마지막 인덱스가 됩니다.
  4. recvx, sendx의 값을 증가시키고 회전합니다(버퍼가 꽉 차있으므로 둘의 값은 같습니다).
  5. 남은 필드를 정리합니다.
  6. sendq에서 나온 고루틴을 활성 상태로 변경합니다.
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L615
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 { // 큐 사이즈가 0이면
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil { // 바로 전달
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else { // 버퍼가 꽉 찼다면
		// ...
		qp := chanbuf(c, c.recvx) // <- qp는 recvx의 위치를 가리킵니다
		// ... 
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) // <- qp의 데이터를 ep로 복사합니다. ep는 수신자가 받을 메모리 주소
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem) // <- 여기서 sg는 send를 하는 고루틴입니다. 송신자의 데이터를 qp로 복사: 꽉찼기 때문!!
		// 꽉찼으니까 뺀 자리에 sendq의 고루틴이 전달하는 데이터를 바로 추가합니다.
		c.recvx++ // recvx를 호출함으로 인해 sg의 데이터가 제일 마지막 순번으로 가집니다!!
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

위의 버퍼가 있는 채널에서 recv시 send하는 고루틴이 블록된 경우를 그림으로 살펴보면 다음과 같습니다.

제목 없는 프레젠테이션 (3).png

이어서 채널 버퍼에서 원소를 꺼내는 경우입니다. 버퍼 채널이라면 다음처럼 큐에서 원소를 빼고 recv를 호출한 고루틴으로 원소를 옮기는 작업을 진행합니다.

func chanrecv(
	// ...
) (selected, received bool) {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L537
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx) // qp위치를 가져오고
		// ... 
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) // qp값 ep로 복사
		}
		typedmemclr(c.elemtype, qp) // qp 메모리 클리어
		c.recvx++ // recvx 값 증가 시킴
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount-- // 원소 개수를 지움
		unlock(&c.lock)
		return true, true
	}

제목 없는 프레젠테이션 (4).png

마지막은 채널에 원소가 없는 경우입니다. 이때는 대기 로직으로 돌입합니다. 해당 로직은 아래 코드와 같이 동작합니다.

  1. sudog를 만들고
  2. 채널의 recvq에 넣고
  3. 채널에 원소를 받는 고루틴 상태 대기로 변경 및 대기
  4. 깨우고 sudog 제거
func chanrecv(
	// ...
) (selected, received bool) {
// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/chan.go#L561
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg

	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	if c.timer != nil {
		blockTimerChan(c)
	}

	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	if c.timer != nil {
		unblockTimerChan(c)
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

제목 없는 프레젠테이션 (5).png


채널에 대한 글은 좋은 글이 많아서 다른 글을 참고하시는 게 더 좋을지도 모릅니다. 그러나 앞으로 채널을 이용한 자료 구조와 태크닉을 정리해볼 계획이 있어서 구현 내부를 탐색해봤습니다. 위의 동작을 다 이해할 필요는 없겠지만 이 정도만 정리하면 좋을 것 같습니다.

  1. 채널의 구현은 원형 큐로 이루어져 있습니다. 그래서 원형 큐의 특징을 가져갑니다. (크기 제한, 캐시 친화, 추가적인 메모리 할당 X)
  2. 고루틴 간 데이터를 전달할 때 채널을 통해 전달되고, 송수신하는 고루틴은 채널 내부에 sudog 형태로 관리됩니다.
  3. 스케줄러는 고루틴이 채널에 송수신시 대기가 걸리면 고루틴의 상태를 바꿔 대기 상태로 설정하고 채널의 내부 큐에 고루틴을 링크드 리스트 형태로 저장합니다.
  4. 대기 중인 고루틴이 해제되면 send, recv의 동작에 따라 메모리를 복사하여 값을 전달하고 원형 큐 내부 데이터를 수정합니다.

다음에 다뤄볼 주제는 다음과 같습니다.

  • select 문의 동작 과정
  • golang synchroization primitive를 활용해서 뭔가(?) 만들기

참고자료

golang - 채널의 내부 구조 - jacking75

Diving Deep Into The Golang Channels.

GopherCon 2017: Filippo Valsorda - Encrypting the Internet with Go

Leave a comment