8 minute read

해당 아티클은 다음 수준의 지식을 요구합니다.

  • 고루틴을 사용해본 경험

다음 실행 환경에서 동작합니다.

  • wsl2 Ubuntu-22.04
  • go1.23.3 linux/amd64

지금까지 net 패키지로 tcp 서버를 구현하기 위한 golang api를 살펴보고 unix syscall을 직접 하는 방식으로 재구현했습니다. 앞선 아티클에서 tcp 서버를 구현하는 방식은 클라이언트의 연결마다 고루틴을 생성하는 방식으로 구현했습니다. 이번에는 io 멀티플렉싱이라는 방식을 활용하여 상수 개수의 고루틴을 이용하여 tcp 서버를 구현하는 방법을 알아봅시다.


epoll 소개

프로세스는 파일을 제어할 때 fd(file discriptor)라는 값을 사용합니다. 이때 소켓 또한 마찬가지로 프로세스에서 파일 디스크립터 형태로 관리됩니다. epoll은 이러한 파일 디스크립터를 다중화(multiplexing)할 수 있는 리눅스 커널의 자료구조입니다. epoll의 동작은 다음과 같이 요약할 수 있습니다.

golang  비동기 시리즈 비공개 (1).png

epoll을 사용하기 위해 epoll을 생성합니다. 애플리케이션에서 epoll_create, epoll_create1함수를 이용하여 epoll 자료구조를 커널에 생성할 수 있습니다.

golang  비동기 시리즈 비공개 (2).png

epoll을 생성했다면, epoll_ctl을 이용하여 epoll에 소켓 fd를 등록합니다. 이때 소켓에서 어떤 이벤트를 어떤 방식으로 받을 것인지 선택할 수 있습니다. 자세한 사항은 코드를 보면서 진행하도록 하겠습니다.

golang  비동기 시리즈 비공개.png

epoll에 소켓 fd의 이벤트 감지를 등록한 후, epoll_wait를 이용하여 클라이언트로부터 요청이 수집된 소켓 fd에 대한 리스트를 받아서 처리할 수 있습니다.

간단하게 epoll에 대한 사용법을 요약하면 아래와 같습니다:

  1. epoll을 생성
  2. epoll에 소켓 fd등록
  3. 다른 쓰레드에서 반복해서 epoll을 대기하여 발생하는 이벤트를 수집하여 처리

echo 서버의 unix syscall 기반 코드와 epoll 기반 코드 비교

이전 아티클에서는 unix POSIX syscall을 golang에 래핑한 unix 패키지를 이용하여 tcp 서버를 구현했습니다. 해당 서버를 구현하기 위해 socket(), bind(), listen(), read(), write()와 같은 기본적인 syscall을 다뤘습니다. 클라이언트의 연결마다 fd(file discriptor)가 리턴됐고 해당 파일 디스크립터를 고루틴을 통해 비동기로 r/w를 하도록 구현했습니다.

package main

import (
	"fmt"

	"golang.org/x/sys/unix"
)

func main() {
	// 1. 서버측 소켓 생성 후 소켓에 대한 file discriptor 소유
	fd, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, 0)
	if err != nil {
		fmt.Println("socket error", err)
		return
	}

	// 2. setsopckopt으로 fd에 소켓 옵션 지정
	// 여기서는 REUSEADDR 플래그 활성화
	if err := unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
		fmt.Println("set sock opt error", err)
		return
	}

	// // 3. 서버측 fd에 대한 blocking 설정
	// if err := unix.SetNonblock(fd, true); err != nil {
	// 	fmt.Println("set non-block error", err)
	// 	return
	// }

	// 4. ipv4기반 주소 생성 및 서버측 소켓과 주소 바인딩
	addr := &unix.SockaddrInet4{}
	addr.Addr = [4]byte{0, 0, 0, 0}
	addr.Port = 4999
	if err := unix.Bind(fd, addr); err != nil {
		fmt.Println("bind error", err)
		return
	}

	// 5. backlog queue 사이즈 설정 및 서버 호스팅
	queueSize := 1024
	if err := unix.Listen(fd, queueSize); err != nil {
		fmt.Println("listen error", err)
		return
	}

	for {
		// 6. accept를 호출하여 클라이언트 연결 대기
		// Accept4호출 이유는 클라이언트측 소켓에 옵션을 부여하여 생성할 수 있기 때문
		// nfd, _, err := unix.Accept4(fd, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
		nfd, _, err := unix.Accept4(fd, unix.SOCK_CLOEXEC)
		if err != nil {
			fmt.Println("accept4 error", err)
			continue
		}

		// 7. accept의 리턴으로 받은 클라이언트측 소켓에 대한 file discriptor를 통해
		// r/w를 진행
		go func(nfd int) {
			b := make([]byte, 1024)
			for {
				if err := logic(nfd, b); err != nil {
					fmt.Println("fd error", err)
					unix.Close(int(nfd))
					break
				}
			}
		}(nfd)
	}
}

func logic(fd int, b []byte) error {
	r, err := unix.Read(fd, b)
	if err != nil {
		return err
	}

	if r == 0 {
		return fmt.Errorf("read EOF")
	}

	_, err = unix.Write(fd, b[:r])
	if err != nil {
		return err
	}
	return nil
}

위의 코드에서 epoll을 이용한 io 멀티플렉싱을 활용하여 고루틴 수를 감소시키는 코드로 변경해보겠습니다. 전체적인 코드는 다음과 같습니다.

package main

import (
	"fmt"

	"golang.org/x/sys/unix"
)

func main() {
	// 1. 서버측 소켓 생성 후 소켓에 대한 file discriptor 소유
	fd, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, 0)
	if err != nil {
		fmt.Println("socket error", err)
		return
	}

	// 2. setsopckopt으로 fd에 소켓 옵션 지정
	// 여기서는 REUSEADDR 플래그 활성화
	if err := unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
		fmt.Println("set sock opt error", err)
		return
	}

	// // 3. 서버측 fd에 대한 blocking 설정
	// if err := unix.SetNonblock(fd, true); err != nil {
	// 	fmt.Println("set non-block error", err)
	// 	return
	// }

	// 4. ipv4기반 주소 생성 및 서버측 소켓과 주소 바인딩
	addr := &unix.SockaddrInet4{}
	addr.Addr = [4]byte{0, 0, 0, 0}
	addr.Port = 4999
	if err := unix.Bind(fd, addr); err != nil {
		fmt.Println("bind error", err)
		return
	}

	// 5. backlog queue 사이즈 설정 및 서버 호스팅
	queueSize := 1024
	if err := unix.Listen(fd, queueSize); err != nil {
		fmt.Println("listen error", err)
		return
	}

	// 6. epoll fd 생성
	epollfd, err := unix.EpollCreate1(0)
	if err != nil {
		fmt.Println("epollcreate error", err)
		return
	}

	go func() {
		for {
			// 7. accept를 호출하여 클라이언트 연결 대기
			// Accept4호출 이유는 클라이언트측 소켓에 옵션을 부여하여 생성할 수 있기 때문
			// nfd, _, err := unix.Accept4(fd, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
			nfd, _, err := unix.Accept4(fd, unix.SOCK_CLOEXEC)
			if err != nil {
				fmt.Println("accept4 error", err)
				continue
			}

			// 8. accept가 성공하면 epoll fd에 이벤트 등록
			// accept된 fd에 unix.EPOLLIN | unix.EPOLLHUP | unix.EPOLLERR에 대한 이벤트 트리거
			if err := unix.EpollCtl(epollfd, unix.EPOLL_CTL_ADD, nfd, &unix.EpollEvent{
				Fd:     int32(nfd),
				Events: unix.EPOLLIN | unix.EPOLLHUP | unix.EPOLLERR,
			}); err != nil {
				fmt.Println("epolladd error", err)
				continue
			}
		}
	}()

	go func() {
		events := make([]unix.EpollEvent, 128)
		for {
			// 9. epoll fd에서 발생하는 이벤트 대기
			n, err := unix.EpollWait(epollfd, events, -1)
			if err != nil {
				fmt.Println("epollwait error", err)
				continue
			}

			// 10. event 상태 체크
			b := make([]byte, 1024)
			for i := range n {
				event := events[i]
				var errfd error
				// 11. 일반 소켓에 대한 입력이 트리거된 경우 일반 로직 진행
				if event.Events&unix.EPOLLIN == unix.EPOLLIN {
					errfd = logic(int(event.Fd), b)
				}

				// 12. 소켓에 에러가 발생한 경우 에러로 취급
				if event.Events&(unix.EPOLLERR|unix.EPOLLHUP) != 0 {
					errfd = fmt.Errorf("epoll hup error")
				}

				// 13. epoll fd에서 소켓 제거
				if errfd != nil {
					fmt.Println("event fd error", errfd)
					unix.EpollCtl(epollfd, unix.EPOLL_CTL_DEL, int(event.Fd), &unix.EpollEvent{
						Fd:     int32(event.Fd),
						Events: 0,
					})
					unix.Close(int(event.Fd))
				}
			}
		}
	}()

	select {}
}

func logic(fd int, b []byte) error {
	r, err := unix.Read(fd, b)
	if err != nil {
		return err
	}

	if r == 0 {
		return fmt.Errorf("read EOF")
	}

	_, err = unix.Write(fd, b[:r])
	if err != nil {
		return err
	}
	return nil
}

두 코드의 차이점이 되는 부분을 중심으로 살펴보겠습니다.


6. epoll_create1

epoll을 커널에 생성 요청하는 부분입니다.

	// 6. epoll fd 생성
	epollfd, err := unix.EpollCreate1(0)
	if err != nil {
		fmt.Println("epollcreate error", err)
		return
	}

생성은 크게 볼 부분이 없습니다.

man epoll_create1

# NAME
#        epoll_create, epoll_create1 - open an epoll file descriptor
# 
# SYNOPSIS
#        #include <sys/epoll.h>
# 
#        int epoll_create(int size);
#        int epoll_create1(int flags);

epoll_create는 size라는 인자를 받았으나 리눅스 2.6.8버전부터 무시되기 때문에 사용되지 않습니다. epoll_create1에 인자로 0을 호출하면 size가 무시되는 epoll_create와 동일하게 epoll 인스턴스를 생성합니다. 단순히 epoll_create1(0)를 이용하여 epoll을 만든다고 보셔도 됩니다.


7~8. accept와 epoll_ctl

epoll을 사용하는 주요한 부분입니다.

	go func() {
		for {
			// 7. accept를 호출하여 클라이언트 연결 대기
			// Accept4호출 이유는 클라이언트측 소켓에 옵션을 부여하여 생성할 수 있기 때문
			// nfd, _, err := unix.Accept4(fd, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
			nfd, _, err := unix.Accept4(fd, unix.SOCK_CLOEXEC)
			if err != nil {
				fmt.Println("accept4 error", err)
				continue
			}

			// 8. accept가 성공하면 epoll fd에 이벤트 등록
			// accept된 fd에 unix.EPOLLIN | unix.EPOLLHUP | unix.EPOLLERR에 대한 이벤트 트리거
			if err := unix.EpollCtl(epollfd, unix.EPOLL_CTL_ADD, nfd, &unix.EpollEvent{
				Fd:     int32(nfd),
				Events: unix.EPOLLIN | unix.EPOLLHUP | unix.EPOLLERR,
			}); err != nil {
				fmt.Println("epolladd error", err)
				continue
			}
		}
	}()

이 부분에서 코드를 먼저 살펴보면 크게 두 부분으로 나눠져 있습니다.

  1. 클라이언트 연결을 accept4로 수신하는 부분
  2. epoll_ctl을 이용하여 accept4로 수신할 소켓 fd를 epoll에 등록하는 부분

1번 부분은 앞서 확인한 부분이고 2번 부분을 살펴보겠습니다. 먼저 epoll_ctl을 알아봅시다.

man epoll_ctl

# NAME
#        epoll_ctl - control interface for an epoll file descriptor
# 
# SYNOPSIS
#        #include <sys/epoll.h>
# 
#        int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

2번 부분 동작을 요약하면 epoll에 이벤트를 감지할 fd를 등록하는 것입니다. epoll_ctl의 인자를 확인해보면 epoll fd에 fd에 이벤트 epoll_event를 추가/수정/삭제합니다. 추가/수정/삭제 동작은 op으로 전달하는 인자로 결정됩니다.

  • EPOLL_CTL_ADD: 추가
  • EPOLL_CTL_MOD: 수정
  • EPOLL_CTL_DEL: 삭제

한편, event로 전달할 수 있는 종류는 다음이 있습니다.

  • EPOLLIN: 수신할 데이터 존재
  • EPOLLOUT: 송신 가능
  • EPOLLERR: 에러 발생
  • EPOLLET: 엣지 트리거 방식으로 설정(기본은 레벨)
  • EPOLLPRI: fd에 예외 상태 발생
  • EPOLLONESHOT: 한 번만 이벤트를 전달받음
  • EPOLLHUP: 연결 종료 이벤트 수신(hang-up)
  • EPOLLRDHUP: 연결 종료 이벤트 수신(half close 포함: 엣지 트리거 모니터링 사용 시 피어 종료를 감지하는 코드를 작성할 때 유용)
  • EPOLLEXCLUSIVE: 리눅스 4.5부터 지원합니다. 동일한 fd가 여러 epoll 인스턴스에 등록된 경우

다시 위에서 호출하는 함수형을 보면 epoll fd에 소켓 fd을 수신 데이터가 존재하거나(EPOLLIN), 에러가 발생했거나(EPOLLERR), 연결 종료 이벤트가 발생(EPOLLHUP)한 경우 epoll_wait시 리턴될 수 있음을 알려주고 있습니다.


9. epoll_wait

다음은 등록한 fd를 대기하는 부분입니다.

	go func() {
		events := make([]unix.EpollEvent, 128)
		for {
			// 9. epoll fd에서 발생하는 이벤트 대기
			n, err := unix.EpollWait(epollfd, events, -1)
			if err != nil {
				fmt.Println("epollwait error", err)
				continue
			}
			// ...

epoll_wait의 시그니처는 다음과 같습니다.

man epoll_wait

# SYNOPSIS
#        #include <sys/epoll.h>
# 
#        int epoll_wait(int epfd, struct epoll_event *events,
#                       int maxevents, int timeout);

epoll fd에 대하여 epoll_wait를 시도하면 이벤트 발생 조건에 맞는 fd의 리스트를 이벤트와 함께 epoll_event로 전달합니다. 이때 timeout값을 전달하는데 -1로 주면 이벤트가 발생할 때까지 무한히 대기합니다.

golang에서 이 함수는 epollfd와 EpollEvent의 배열과 시간 값을 전달해주면 발생한 이벤트의 개수와 에러를 리턴하는 형식으로 구현되어 있습니다. 소켓 fd가 클라이언트로부터 에러가 발생하거나 어떤 데이터가 존재한 경우 EpollWait의 블로킹이 해제되면서 사용자는 events에 담긴 fd를 제어하게 됩니다.


10~13. epoll_wait 이후에 대한 처리

다음은 마지막 부분으로 wait하고 리턴으로 수집된 fd에 대한 처리를 하는 부분입니다.

			// ...
			// 10. event 상태 체크
			b := make([]byte, 1024)
			for i := range n {
				event := events[i]
				var errfd error
				// 11. 일반 소켓에 대한 입력이 트리거된 경우 일반 로직 진행
				if event.Events&unix.EPOLLIN == unix.EPOLLIN {
					errfd = logic(int(event.Fd), b)
				}

				// 12. 소켓에 에러가 발생한 경우 에러로 취급
				if event.Events&(unix.EPOLLERR|unix.EPOLLHUP) != 0 {
					errfd = fmt.Errorf("epoll hup error")
				}

				// 13. epoll fd에서 소켓 제거
				if errfd != nil {
					fmt.Println("event fd error", errfd)
					unix.EpollCtl(epollfd, unix.EPOLL_CTL_DEL, int(event.Fd), &unix.EpollEvent{
						Fd:     int32(event.Fd),
						Events: 0,
					})
					unix.Close(int(event.Fd))
				}
			}
		}
	}()

이 부분에서 발생한 이벤트를 반복문을 통해 처리하게 됩니다. 반복문 내부에서 이벤트의 종류에 따라 소켓 입력인지, 에러 또는 종료인지 탐색합니다. 일반 소켓 입력이라면 입력 로직을 처리하는 11번 분기로 진입합니다. 이때 에러가 발생하거나 12번 이벤트가 발생한 경우, 마지막 13번에서 EpollCtl로 소켓 fd에 대한 이벤트 등록을 제거하고 종료합니다.


이번 아티클에서 golang unix 패키지의 epoll 관련 함수를 활용하여 입출력 다중화를 구현해보았습니다. epoll을 이용하면 기존 구현에서 클라이언트 소켓 연결마다 발생하는 고루틴의 개수를 획기적으로 줄일 수 있었습니다. 그 이유는 epoll의 동작과 관련이 있습니다. 위의 코드에서 다음과 같이 epoll을 이용한 echo서버 동작을 구현했습니다.

  1. epoll을 생성하고 연결되는 소켓을 epoll에 이벤트 조건과 함께 등록합니다.
  2. epoll_wait을 통해 그동안 발생한 이벤트를 대기합니다.
  3. 이벤트가 발생했다면 epoll_wait에서 블로킹이 해제되고 이벤트 처리 로직 구현에 따라 이벤트를 처리하게 됩니다.

지금까지 unix syscall과 epoll을 이용한 소켓 echo 서버를 구현해보았습니다. 다음에는 golang 네트워크 기반인 netpoller의 내부 구현에 대하여 살펴보겠습니다.

코드입니다.

https://github.com/atgane/syncgo/blob/c32745b83be1ed4e879a4e46172aa2c6c860d5c0/04/main.go#L1

Leave a comment