4 minute read

해당 아티클은 다음 수준의 지식을 요구합니다.

  • POSIX 네트워크 syscall
  • epoll

이전 아티클 “golang 비동기. tcp 서버(5). netpoller 2부. golang 내부 epoll 활용: Accept”를 보시는 걸 권장합니다. 이번에는 read와 write하는 부분이 어떻게 일어나는지 추적해보겠습니다.

예시로 살펴보는 지점은 다음 코드의 ✅ 부분입니다.

func main() {
	listener, err := net.Listen("tcp", ":8989")
	if err != nil {
		fmt.Println("listen error", err)
		return
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println("accept error", err)
			continue
		}
		go func() {
			b := make([]byte, 4096)
			for {
				// ✅ read 내부
				n, err := conn.Read(b)
				if err != nil {
					fmt.Println("read error", n)
					return
				}

				// ✅ write 내부
				if _, err := conn.Write(b); err != nil {
					fmt.Println("write error", n)
					return
				}
			}
		}()
	}
}

1. Read 내부

(net.Conn).Read가 진행되는 과정은 Accept가 진행되는 과정과 매우 유사합니다. Read또한 FD의 Read부터 코드를 확인해보겠습니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/internal/poll/fd_unix.go#L140
func (fd *FD) Read(p []byte) (int, error) {
	// ✅ readLock을 획득: 하나의 conn에서 여러 고루틴이 Read를 호출해도
	// 하나의 고루틴만 로직이 진행
	if err := fd.readLock(); err != nil {
		return 0, err
	}
	defer fd.readUnlock()
	
	// 바이트 배열의 크기를 0으로 주었다면 무시
	if len(p) == 0 {
		// ...
		return 0, nil
	}
	
	// Read 준비
	if err := fd.pd.prepareRead(fd.isFile); err != nil {
		return 0, err
	}
	
	// ...

	for {
		// ✅ read syscall 호출
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			
			// ✅ 만약 non-blocking에 의한 데이터가 도착하지 않아 읽을 수 없는 상태라면
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		// ✅ 읽은 데이터가 존재하는 경우 바로 return 절로 이동
		err = fd.eofError(n, err)
		return n, err
	}
}

Accept와 비교해보면 매우 유사한 점이 많습니다. 함수 동작을 요약하면 다음과 같습니다.

  1. readLock을 획득합니다. 따라서 같은 fd에서 접근하는 Read요청에 대하여 하나의 고루틴만 진입하도록 합니다.
  2. Read를 준비합니다. 여기서 Read란 소켓으로부터 수신된 데이터가 존재하는 경우를 의미합니다.
  3. read를 호출합니다. 논블로킹 소켓이기 때문에 바로 리턴됩니다.
  4. 만약 읽은 데이터가 존재하는 경우 바로 읽은 바이트 배열과 함께 리턴됩니다.
  5. 만약 읽은 데이터가 존재하지 않는 경우 EAGAIN 분기로 처리됩니다. 해당 분기에서 fd.pd.waitRead를 호출하여 데이터가 존재할 때까지 고루틴을 대기합니다.

2. Write 내부

write 과정도 마찬가지로 *FD.Write부터 코드를 확인하겠습니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/internal/poll/fd_unix.go#L366
func (fd *FD) Write(p []byte) (int, error) {
	// ✅ writeLock을 획득: 하나의 conn에서 여러 고루틴이 Write를 호출해도
	// 하나의 고루틴만 로직이 진행
	if err := fd.writeLock(); err != nil {
		return 0, err
	}
	defer fd.writeUnlock()
	
	// Write 준비
	if err := fd.pd.prepareWrite(fd.isFile); err != nil {
		return 0, err
	}
	
	// ✅ 지금까지 써진 데이터
	var nn int
	for {
		max := len(p)
		if fd.IsStream && max-nn > maxRW {
			max = nn + maxRW
		}
		
		// ✅ write syscall 호출
		n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
		if n > 0 {
			// ✅ 써진 데이터를 누적시킴
			nn += n
		}
		
		// ✅ 만약 쓰인 데이터가 입력 데이터 크기와 같은 경우 종료
		if nn == len(p) {
			return nn, err
		}
		
		// ✅ write를 일시적으로 할 수 없는 상태인 경우 write 고루틴이 활성화될 때까지 대기
		if err == syscall.EAGAIN && fd.pd.pollable() {
			if err = fd.pd.waitWrite(fd.isFile); err == nil {
				continue
			}
		}
		
		// ✅ 에러 및 EOF 처리
		if err != nil {
			return nn, err
		}
		if n == 0 {
			return nn, io.ErrUnexpectedEOF
		}
	}
}

write 또한 read 과정과 유사합니다.

  1. writeLock을 획득합니다. 따라서 같은 fd에서 접근하는 Write요청에 대하여 하나의 고루틴만 진입하도록 합니다.
  2. Write를 준비합니다. 여기서 Write란 소켓으로부터 송신할 데이터가 존재하는 경우를 의미합니다.
  3. write를 호출합니다. 논블로킹 소켓이기 때문에 바로 리턴됩니다.
  4. 쓴 데이터를 전체에서 얼마만큼 썼는지 누적 길이 nn에 저장합니다.
  5. 만약 누적 길이 nn이 쓴 데이터의 크기와 같다면 리턴합니다.
  6. 당장 데이터를 쓸 수 없는 에러가 발생한 경우 fd.pd.waitWrite를 호출하여 다시 쓸 수 있는 상태가 될 때까지 고루틴을 대기시킵니다.
  7. 에러가 존재하는 경우, 기존에 썼던 크기와 함께 에러를 리턴합니다.

3. 비활성화 & 활성화 로직

비활성화 & 활성화 로직은 accept 과정과 유사하게 동작합니다.

비활성화 과정에서 read와 write 모두 waitRead, waitWrite를 호출하여 netpollblock 내에서 고루틴이 대기하게 됩니다. 고루틴이 대기할 때 gopark에서 최종으로 호출하는 netpollblockcommit 함수 내에서 고루틴의 정보를 저장합니다. 읽기와 쓰기는 pollDesc 내부에서 rg, wg 중 어느 것을 활용하는지 정도의 차이가 존재합니다.

읽기와 쓰기는 따로 이벤트가 관리됩니다. 이 이유는 쓰기와 읽기 중 하나만 일시적인 에러이고 다른 하나는 정상으로 동작하는 상태일 수 있기 때문입니다. 이에 대한 코드는 앞에서도 확인했지만 netpoll 코드에서 볼 수 있습니다.

// https://github.com/golang/go/blob/a10e42f219abb9c5b/src/runtime/netpoll_epoll.go#L98
func netpoll(delay int64) (gList, int32) {
	// ...
	
	// ✅ epollwait로 이벤트 수신 대기
	n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
	
	for i := int32(0); i < n; i++ {
		ev := events[i]
		
		// ...
		
		// ✅ 일치하는 이벤트에 따라 분기 처리
		if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'w'
		}
		
		// ✅ mode가 존재하는 경우 netpollready에서 고루틴 활성화
		if mode != 0 {
			tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
			pd := (*pollDesc)(tp.pointer())
			tag := tp.tag()
			if pd.fdseq.Load() == tag {
				pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
				delta += netpollready(&toRun, pd, mode)
			}
		}
	}
	return toRun, delta
}

이 함수가 호출되는 방식 또한 accept에서 확인한 것과 동일하게 다음과 같이 활성화됩니다.

  1. netpoll은 sysmon, findRunnable등 고루틴 런타임 내부 함수로부터 호출됩니다.
  2. netpoll호출 시 epollwait로 리턴받은 이벤트 리스트로 고루틴 리스트를 추출합니다.
  3. sysmon은 grq에, findRunnable은 rlq에 고루틴을 저장합니다.

여기까지 netpoll을 이용한 golang의 소켓 R/W 동작을 살펴봤습니다. R/W 과정은 대체로 유사하게 동작하며 “syscall 호출” → “EAGAIN이면 고루틴 대기” → “외부에서 활성화 호출하여 대기가 끝난 고루틴 해제” 과정을 거친다는 것을 알아봤습니다. 이후 고루틴 해제 과정이 지나면 런타임의 runnable queue에 실행 대기 상태의 고루틴을 넣어 쓰레드 위에서 고루틴이 실행될 수 있도록 합니다.

Leave a comment