8 minute read

해당 아티클은 다음 수준의 지식을 요구합니다.

  • 고루틴, 채널, select문과 같은 golang 비동기 기본
  • atomic에 존재하는 구조체에 대한 이해

다음 실행환경에서 동작합니다.

  • wsl2 Ubuntu-22.04
  • go1.22.2 linux/amd64

드디어 synchronization primitive 내부 동작 원리를 살펴봤습니다. 이번에는 select문에 우선순위를 주입하는 방법을 알아보고 지금까지 다룬 개념을 이용하여 고루틴, 채널, select, atomic등과 같은 도구로 golang내부에서 사용할 수 있는 이벤트루프를 구현해봅시다. 물론 아직 context는 다루지 않아서 나중에 다루겠지만 여기서는 단순하게 외부, 즉 다른 고루틴에서 취소를 주입할 수 있는 대상 정도로 이해해도 좋습니다.


select문 우선 순위 주입

기본적으로 select문은 바로 실행 가능한 케이스가 여러 개인 경우 그 중 랜덤으로 하나를 선택하여 실행합니다. select문의 디자인이 본래 그렇지만, 종종 케이스 별 우선순위를 지정하여 처리해야 되는 경우가 있습니다. 케이스 문에서 먼저 검증하기를 원하는 케이스가 있는 경우가 그렇습니다. 대체로 이런 경우는 비동기 이벤트 수신과 이벤트 수신 종료에 대한 이벤트가 하나의 select문에서 처리되는 경우입니다. 예를 들어 다음과 같습니다.

func (c *client) Send(ctx context.Context, e event) error {
	select {
	case <- ctx.Done():
		return ctx.Err()
	case c.eventChannel <- e:
		return nil
	}
}

context는 아직 다루지 않았지만 context.Context인터페이스는 다른 고루틴에서 흐름을 제어하고 싶을 때 사용합니다. 위의 코드에서 eventChannel에 모든 원소가 다 차게 되어 이벤트를 전달할 수 없는 경우 ctx가 외부 고루틴으로 인해 중지됐다면, 에러로 종료되는 케이스입니다. 그런데 이 경우에 Send가 호출되자마자 ctx.Done 채널이 닫혀있고 eventChannel에 event가 들어갈 자리가 존재한다면 두 케이스 중에 랜덤으로 선택하여 실행하게 됩니다. 그러나 위에서 ctx가 미리 종료된 상황이라면 해당 케이스를 먼저 감지해야 하지 않을까요? 이런 경우에 select문에서 케이스에 우선순위를 결정해야 합니다.

가장 쉽게 구현하는 방법은 다음 방식을 이용하는 것입니다.

func (c *client) Send(e event) error {
	select {
	case <- ctx.Done():
		return ctx.Err()
	default:
		select {
		case <- ctx.Done():
			return ctx.Err()
		case c.eventChannel <- e:
			return nil
		}
	}
}

해당 방식은 첫 번째 select문에서 ctx의 종료를 미리 감지하고 처리할 수 없는 경우 default로 넘어가 ctx.Done과 c.eventChannel에 데이터를 넣는 상태로 돌입합니다.

이를 좀 더 보기 쉬운 방식으로 코드의 depth를 줄이는 if문의 early return과 비슷하게 다음과 같이 처리할 수 있습니다.

func (c *client) Send(e event) error {
	select {
	case <- ctx.Done():
		return ctx.Err()
	default:
	}
	
	select {
	case <- ctx.Done():
		return ctx.Err()
	case c.eventChannel <- e:
		return nil
	}
}

이런 방식을 이용하면 Send가 호출되는 시점에 우선으로 처리하고 싶은 케이스를 먼저 선언하여 함수 호출과 동시에 여러 케이스를 처리할 수 있는 경우에 대한 랜덤 선택을 방지할 수 있습니다.

주의할 점은 위와 같이 select문에 대한 우선순위를 결정할 때 첫 번째 select문이 종료된 이후에 ctx가 종료되고 eventChannel에 자리가 쌓인 경우 다음 select 시 우선순위 보장을 할 수 없게 됩니다. 이때는 c.eventChannel에 원소를 집어넣을 수 있게 됩니다. 즉, 아래의 우선순위를 보장할 수 없는 위치까지 쓰레드가 실행하고 다음 select문을 실행하기 전 대기하고 있는 상황에서 여러 케이스가 실행 가능한 경우, 우선순위를 보장하지 못합니다.

func (c *client) Send(e event) error {
	select {
	case <- ctx.Done():
		return ctx.Err()
	default:
	}
	
	// 우선순위를 보장할 수 없는 위치
	
	select {
	case <- ctx.Done():
		return ctx.Err()
	case c.eventChannel <- e:
		return nil
	}
}

이런 상태에 대한 동시성은 보통 무시되곤 하지만 eventloop를 구현해보면서 유사 사례를 방어하는 코드를 작성해봅시다.


이벤트루프 구현하기

이벤트루프를 구현하기 전에 스펙을 잡아봅시다. 이벤트루프는 주입된 이벤트를 순차적이고 비동기적으로 실행하는 역할을 수행합니다. 따라서 다음의 메서드 구현을 목표로 합니다.

func (e *Eventloop[T]) Run() // 이벤트루프를 실행합니다.
func (e *Eventloop[T]) Send(ctx context.Context, event T) (err error) // 이벤트루프에 이벤트를 전달합니다.
func (e *Eventloop[T]) Close() // 이벤트루프를 정상 종료합니다.
func (e *Eventloop[T]) ForceClose() // 이벤트루프를 강제 종료합니다.

아래는 세부 구현 목표 및 고려사항입니다.

  1. Run 메서드로 이벤트루프 동작 시 유한 개의 고루틴이 이벤트를 큐에서 가져와 처리합니다(dispatch). Run 메서드는 이벤트루프가 종료되기 전까지 블로킹됩니다.
  2. Send 메서드로 이벤트루프에 이벤트 주입이 가능합니다. 이때 eventloop가 종료된 경우 Send에서 에러를 리턴합니다. Send 메서드에서 리턴이 nil이고 ForceClose가 호출되지 않았다면 실행을 보장합니다.
  3. Close 메서드로 이벤트루프의 정상 종료를 구현합니다. Close 메서드 호출 시 큐에 존재하는 모든 이벤트의 디스패칭을 기다리고 Run이 종료됩니다. 한편 Close이후에 호출되는 Send는 에러를 리턴합니다.
  4. ForceClose 메서드로 이벤트루프의 강제 종료를 구현합니다. ForceClose 메서드 호출 시 큐에 남은 이벤트에 상관없이 디스패칭을 기다리지 않고 Run이 종료됩니다. ForceClose이후에 호출되는 Send는 에러를 리턴합니다.
  5. Send 메서드는 이벤트루프에 큐가 다 찬 경우만 블로킹됩니다.

구현할 이벤트루프에 대한 생성자 및 메서드 시그니처를 다음과 같이 작성할 수 있습니다.

package ds

import "sync/atomic"

type Eventloop[T any] struct {
	queue         chan T
	handler       func(T)
	dispatchCount int
	closeCh       chan struct{}
	closed        atomic.Bool
}

func NewEventloop[T any](queueSize, dispatchCount int, handler func(T)) *Eventloop[T] {
	e := &Eventloop[T]{}
	e.queue = make(chan T, queueSize)
	e.handler = handler
	e.dispatchCount = dispatchCount
	e.closeCh = make(chan struct{})
	return e
}

func (e *Eventloop[T]) Run()
func (e *Eventloop[T]) Send(event T) error
func (e *Eventloop[T]) Close()
func (e *Eventloop[T]) ForceClose()

여기서 먼저 ForceClose를 구현하지 않고 Close호출 시 강제 종료하도록 구현해보겠습니다. 구현체를 아래와 같이 만들 수 있습니다.

package ds

import (
	"errors"
	"sync"
	"sync/atomic"
)

type Eventloop[T any] struct {
	queue         chan T
	handler       func(T)
	dispatchCount int
	closeCh       chan struct{}
	closed        atomic.Bool
}

func NewEventloop[T any](queueSize, dispatchCount int, handler func(T)) *Eventloop[T] {
	e := &Eventloop[T]{}
	e.queue = make(chan T, queueSize)
	e.handler = handler
	e.dispatchCount = dispatchCount
	e.closeCh = make(chan struct{})
	return e
}

func (e *Eventloop[T]) Run() {
	wg := sync.WaitGroup{}
	for range e.dispatchCount { // dispatchCount만큼 루프 고루틴을 실행합니다.
		wg.Add(1)
		go func() {
			defer wg.Done()
			e.dispatch()
		}()
	}

	wg.Wait()
}

func (e *Eventloop[T]) Send(event T) error {
	select {
	case <-e.closeCh: // 1. 해당 위치에 closeCh를 감지하여 Send에서 queue가 꽉 차 대기할 때나 queue가 닫혔을 때 호출되는 것을 방지합니다.
		return ErrAlreadyClosedLoop
	case e.queue <- event:
		return nil
	}
}

func (e *Eventloop[T]) Close() {
	if !e.closed.CompareAndSwap(false, true) {
		return
	}

	close(e.closeCh)
}

func (e *Eventloop[T]) dispatch() {
	for {
		select {
		case event := <-e.queue:
			e.handler(event)
		case <-e.closeCh: // 2. dispatch에서 closeCh를 감지하면 탈출하도록 합니다.
			return
		}
	}
}

var ErrAlreadyClosedLoop = errors.New("already closed loop")

주요 구현 부분은 다음과 같습니다.

  • Close 시 closeCh를 닫아 closeCh의 recv가 가능하도록 합니다.
  • Send 시 이벤트를 전달합니다. 이때 Close메서드가 호출되어 닫힘 이벤트가 발생하면, select 상태에서 빠져나갈 수 있도록 합니다.
  • dispatch 메서드 또한 이벤트를 처리하다 Close메서드가 호출되어 닫힘 이벤트가 발생하면, select 상태에서 빠져나갈 수 있도록 합니다.

주요 발생 문제입니다.

  • 만약 Send 시 select 구문을 대기하는 상태에서 다른 쓰레드로 인해 Close의 close(e.closeCh) 이후에 도달한 상태라면 select에서 어떤 case를 선택할 지 알 수 없습니다.
  • Close메서드가 호출됐다면 dispatch를 하는 고루틴이 Send시 nil로 리턴된 이벤트에 대하여 실행을 보장할 수 없습니다.

수정해본다면 다음을 고려해야 합니다.

  • dispatch와 Send에 select문 우선순위를 도입합니다.

Close호출 시 강제 종료할 때 안전하게 동작하도록 다시 수정해봅시다.

package ds

import (
	"errors"
	"sync"
	"sync/atomic"
)

type Eventloop[T any] struct {
	queue         chan T
	handler       func(T)
	dispatchCount int
	closeCh       chan struct{}
	closed        atomic.Bool
}

func NewEventloop[T any](queueSize, dispatchCount int, handler func(T)) *Eventloop[T] {
	e := &Eventloop[T]{}
	e.queue = make(chan T, queueSize)
	e.handler = handler
	e.dispatchCount = dispatchCount
	e.closeCh = make(chan struct{})
	return e
}

func (e *Eventloop[T]) Run() {
	wg := sync.WaitGroup{}
	for range e.dispatchCount { // dispatchCount만큼 루프 고루틴을 실행합니다.
		wg.Add(1)
		go func() {
			defer wg.Done()
			e.dispatch()
		}()
	}

	wg.Wait()
}

func (e *Eventloop[T]) Send(event T) error {
	select { // 1. select 도입으로 closeCh를 먼저 검사합니다.
	case <-e.closeCh:
		return ErrAlreadyClosedLoop
	default:
	}

	// 4. 그러나 이 위치에 고루틴이 n개가 대기하고 있는 상황에서
	// Close가 발생한 경우 문제가 생길 수 있습니다.
	// 왜냐하면 dispatch가 호출되기 전에 dispatch 고루틴이
	// 종료되기 때문입니다.

	select {
	case <-e.closeCh:
		return ErrAlreadyClosedLoop
	case e.queue <- event:
		return nil
	}
}

func (e *Eventloop[T]) Close() {
	if !e.closed.CompareAndSwap(false, true) {
		return
	}

	close(e.closeCh)
}

func (e *Eventloop[T]) dispatch() {
	for {
		// 2. 이벤트가 처리되는 속도보다 생성이 빠른 경우
		// 해당 위치에서 처리됩니다.
		select {
		case event := <-e.queue:
			e.handler(event)
			continue // 5. 또한 queue에 데이터가 많은 경우 강제종료되지 않습니다.
		default:
		}

		// 3. 이벤트가 처리되는 속도보다 생성이 느린 경우
		// 해당 위치에서 처리됩니다.
		select {
		case event := <-e.queue:
			e.handler(event)
		case <-e.closeCh:
			return
		}
	}
}

var ErrAlreadyClosedLoop = errors.New("already closed loop")

주요 구현 부분은 다음과 같습니다.

  • Send 시 select문의 우선순위를 closeCh가 더 높게 구현합니다.
  • dispatch 시 select 문의 우선순위를 queue가 더 높게 구현합니다.

이전에 언급한 문제의 방어 방법입니다.

  • Send시 select의 선택: select를 선택할 때 closeCh를 먼저 확인하여 닫힘에 대하여 안전하게 만들도록 합니다.
  • dispatch 시 select의 선택: dispatch시 queue를 먼저 탐색하여 Send에서 nil로 요청된 이벤트에 대하여 실행을 보장합니다. 그러나 아직 완벽하지 않습니다.

주요 발생 문제입니다.

  • Send시 4번 위치에 존재하는 고루틴이 많은 상태에 Close가 종료된 경우 이벤트의 실행이 nil로 될 수 있고 Err로 될 수 있습니다. 이때 nil로 리턴된 값에 대하여 queue에 데이터가 있는 경우 실행을 보장하지만 없는 경우 실행을 보장하지 못합니다.
  • Close시 queue에 데이터가 기존에 많은 경우 강제 종료하지 못합니다.

수정해본다면 다음을 고려해야 합니다.

  • closeCh이 닫히는 경우를 종료 예약의 개념으로 접근합니다: 이번에는 Close와 ForceClose를 분리해봅니다.
  • dispatch가 강제 종료될 수 있도록 구현합니다.
  • Send시 4번 위치에 고루틴이 있는 경우를 고려합니다.

이번 구현에서는 Close를 수정해야 하는데, 수정하면 ForceClose와 Close의 구분이 쉬워지므로 한 번 같이 진행해봅시다. Close호출 시 원래 queue에 존재하던 이벤트는 처리하고 종료하는 것을 예약하고 ForceClose는 존재하던 이벤트에 상관없이 종료되도록 구현해봅니다. 해결법은 다음과 같습니다.

// https://github.com/atgane/syncgo/blob/d81c5d9266c1ded5f7ea2f05040ac373893906b6/01/ds/eventloop.go#L1
package ds

import (
	"errors"
	"sync"
	"sync/atomic"
)

type Eventloop[T any] struct {
	queue         chan T
	handler       func(T)
	dispatchCount int
	closeCh       chan struct{}
	closed        atomic.Bool
	forceClosed   atomic.Bool // 강제 종료 필드를 추가합니다.
	state         atomic.Int32
}

func NewEventloop[T any](dispatchCount, queueSize int, handler func(T)) *Eventloop[T] {
	e := &Eventloop[T]{
		queue:         make(chan T, queueSize),
		handler:       handler,
		dispatchCount: dispatchCount,
		closeCh:       make(chan struct{}),
	}
	return e
}

func (e *Eventloop[T]) Run() {
	wg := sync.WaitGroup{}
	for range e.dispatchCount {
		wg.Add(1)
		go func() {
			defer wg.Done()
			e.dispatch()
		}()
	}
	wg.Wait()
}

func (e *Eventloop[T]) Send(event T) error {
	if e.closed.Load() {
		return ErrAlreadyClosedLoop
	}
	
	// 1. Send시 Send를 하는 고루틴을 카운팅합니다.
	
	e.state.Add(1)
	defer e.state.Add(-1)

	select {
	case <-e.closeCh:
		return ErrAlreadyClosedLoop
	default:
	}

	select {
	case <-e.closeCh:
		return ErrAlreadyClosedLoop
	case e.queue <- event:
		return nil
	}
}

func (e *Eventloop[T]) Close() {
	if !e.closed.CompareAndSwap(false, true) {
		return
	}

	close(e.closeCh)
}

func (e *Eventloop[T]) ForceClose() {
	if !e.closed.CompareAndSwap(false, true) {
		return
	}

	close(e.closeCh)
	for e.state.Load() != 0 {
	}
	e.forceClosed.Store(true)
}

func (e *Eventloop[T]) dispatch() {
	for {
		select {
		case event := <-e.queue:
			if e.forceClosed.Load() { // 3. 강제 종료조건을 체크합니다.
				return
			}
			e.handler(event)
			continue
		default:
		}

		select {
		case event := <-e.queue:
			if e.forceClosed.Load() {
				return
			}
			e.handler(event)
			// 2. 종료 예약이 걸린 경우, Send중인 고루틴이 있으면 재검사합니다.
			// 왜냐하면 Send 시 closeCh이 닫히면 Err로 리턴될지 nil로 리턴될 지 알 수 없기 때문입니다.
		case <-e.closeCh: 
			if e.state.Load() == 0 {
				return
			}
		}
	}
}

var ErrAlreadyClosedLoop = errors.New("already closed loop")

주요 구현 부분은 다음과 같습니다.

  • Close는 closed를 true로 변경하고 closeCh를 닫습니다.
  • dispatch는 select문을 분리하여 두 개의 select문으로 나누고 첫 번째 select문에서 queue에 이벤트가 쌓인 경우에 스핀하며 처리합니다. 만약 event가 모두 떨어진 상태에서 closeCh이 닫히는 경우 Send중인 고루틴이 있는지 검사하고 있으면 스핀합니다.

이벤트루프를 구현하면서 다음 테크닉을 이용했습니다.

  1. select문의 우선순위를 지정하여 원하는 케이스를 먼저 탐색합니다.
  2. atomic 자료형의 CompareAndSwap을 이용하여 여러 고루틴이 동시에 접근했을 때 하나의 고루틴에 대한 실행을 보장합니다.
  3. atomic counting을 활용해 실행중인 고루틴의 개수를 세고 해당 고루틴이 없을 때까지 스핀하며 대기합니다.

전체 코드

Leave a comment