5 minute read

다시 application controller로 돌아와서 CompareAppState()를 살펴보겠습니다. 이 함수는 원격 저장소와 대상 클러스터 사이의 차이를 계산해 어떤 리소스를 다시 맞춰야 하는지 정하는 역할을 했습니다. 이번에는 그 결과를 바탕으로 application controller가 실제 Kubernetes 리소스를 동기화하는 과정을 따라가 보겠습니다.


refresh queue에서 operation queue로

지금까지는 application 상태를 가져와 원격 저장소와 비교하고, sync 대상이 무엇인지 계산하는 흐름까지 확인했습니다. 이제는 그 결과를 바탕으로 실제 Kubernetes 리소스가 어떻게 동기화되는지 살펴보겠습니다.

다시 processAppRefreshQueueItem()을 보면 함수가 끝날 때마다 실행되는 defer 절이 있습니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L1541-L1557
func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
	
	// ...
	
	defer func() {
		if r := recover(); r != nil {
			log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
		}
		// We want to have app operation update happen after the sync, so there's no race condition
		// and app updates not proceeding. See https://github.com/argoproj/argo-cd/issues/18500.
		// ✅ 이 부분에서 refresh queue -> operation queue로 넘어감
		ctrl.appOperationQueue.AddRateLimited(appKey)
		ctrl.appRefreshQueue.Done(appKey)
	}()
	
	// ...

processAppRefreshQueueItem()이 끝나면 위 defer 절이 실행되고, 이때 ctrl.appOperationQueue.AddRateLimited(appKey)가 호출되면서 작업이 refresh queue에서 operation queue로 넘어갑니다. 즉 refresh 단계에서 비교를 마친 application key가 다음 처리 단계로 전달됩니다. 이 key를 실제로 소비하는 함수는 processAppOperationQueueItem()입니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L934-L988
func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext bool) {
	// ✅ queue data 가져오기
	appKey, shutdown := ctrl.appOperationQueue.Get()
	
	// ...
	
	// ✅ informer에서 object 조회
	obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
	
	// ...

	// ✅ operation이 nil이 아닐 때
	if app.Operation != nil {
		ctrl.processRequestedAppOperation(app)
		ts.AddCheckpoint("process_requested_app_operation_ms")
	}
	
	// ...
	
}

operation queue에서는 appKey로 application 오브젝트를 다시 조회합니다. 그리고 app.Operation이 남아 있으면 processRequestedAppOperation()으로 넘겨 실제 동기화 단계를 진행합니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L1317-L1382
func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {

	// ...

	if isOperationInProgress(app) {
		// 이미 처리중인 경우 종료가 아니면 retry 로직
	} else {
		
		// app phase를 operation running으로 변경
		state = &appv1.OperationState{Phase: synccommon.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
		ctrl.setOperationState(app, state)
		// ...
	}

	// 목적지 검증 후
	if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
		
		// ...
		
		// ✅ 에러가 없는 경우 SyncAppState 호출
	} else {
		ctrl.appStateManager.SyncAppState(app, state)
	}
	
	// ...
	// app state 재검사
	
}

이 함수의 큰 흐름은 다음과 같습니다.

  • operation state 초기화 또는 재시도 상태 복구
  • 대상 클러스터 검증
  • SyncAppState() 호출
  • sync 이후 application 상태 후처리

즉 실제 Kubernetes 리소스를 맞추는 핵심 진입점은 SyncAppState()입니다.

SyncAppState에서 sync context 생성

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/sync.go#L90-L227
func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState) {
	// ...

	// ✅ CompareAppState 재호출
	compareResult, err := m.CompareAppState(app, proj, revisions, sources, false, true, syncOp.Manifests, isMultiSourceRevision, rollback)

	// ...

	// ✅ 클러스터 조회
	clst, err := m.db.GetCluster(context.Background(), app.Spec.Destination.Server)
	
	// ...

}
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/sync.go#L370-L393
func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState) {
	// ...
	
	// ✅ syncCtx 생성
	syncCtx, cleanup, err := sync.NewSyncContext(
		// ...
	)
	
	// ...

	if state.Phase == common.OperationTerminating {
		syncCtx.Terminate()
	} else {
		// ✅ terminating이 아니면 Sync 진행
		syncCtx.Sync()
	}
	
	// ...
}

SyncAppState() 내부에서는 크게 다음 순서가 실행됩니다.

  • CompareAppState()를 한 번 더 호출해 비교 결과를 다시 계산
  • 클러스터 조회
  • sync.NewSyncContext()로 sync context 생성
  • syncCtx.Sync() 호출

여기서 실제 동기화를 수행하는 주체는 syncCtx.Sync()입니다. sync.NewSyncContext()는 Argo CD 저장소 내부 구현이 아니라 gitops-engine 패키지에서 제공하는 코드입니다.

gitops-engine은 Argo CD와 Flux CD가 함께 사용하던 GitOps 엔진입니다. 자세한 배경은 gitops-engine FAQ에서 확인할 수 있습니다.

gitops-engine의 Sync 실행

gitops-engine의 Sync() 메서드는 다음과 같이 동작합니다.

// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L394-L533
func (sc *syncContext) Sync() {
	// ...

	if sc.started() {
		// ...
	} else {
	
		// ...
		
		// ✅ dry run으로 사전 동작 체크
		dryRunTasks := tasks
		if sc.applyOutOfSyncOnly {
			dryRunTasks = sc.filterOutOfSyncTasks(tasks)
		}

		sc.log.WithValues("tasks", dryRunTasks).Info("Tasks (dry-run)")
		if sc.runTasks(dryRunTasks, true) == failed {
			sc.setOperationPhase(common.OperationFailed, "one or more objects failed to apply (dry run)")
			return
		}
	}

	// ...
	// sync 시 hook & phase 설정
	// ...
	
	// ✅ 실제 kubernetes apply 실행
	runState := sc.runTasks(tasks, false)

	// runState에 대한 상태 분기 처리
}

Sync()는 먼저 dry-run으로 리소스 검증을 수행하고, 문제가 없을 때 실제 runTasks()를 호출합니다. 실제 리소스 적용 경로는 runTasks()에서 이어집니다. 여기서는 생성 관련 흐름만 보겠습니다.

// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L1155-L1262
func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
	
	// ...

	// tasks에 대한 분리 작업
	for _, task := range tasks {
		if task.isPrune() {
			pruneTasks = append(pruneTasks, task)
		} else {
			createTasks = append(createTasks, task)
		}
	}
	
	// prune 처리
	// ...

	// delete 처리
	// ...

	var tasksGroup syncTasks

	// ...
	
	if len(tasksGroup) > 0 {
		// ✅ 리소스 생성 처리
		state = sc.processCreateTasks(state, tasksGroup, dryRun)
	}
	
	return state
}

// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L1265-L1285
func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRun bool) runState {
	ss := newStateSync(state)
	for _, task := range tasks {
		// dryRun인 경우 스킵
		if dryRun && task.skipDryRun {
			continue
		}
		t := task
		ss.Go(func(state runState) runState {
			
			// ...
			
			// ✅ 고루틴으로 applyObject로 리소스 생성 호출
			result, message := sc.applyObject(t, dryRun, validate)
			
			// ...
			
			return state
		})
	}
	return ss.Wait()
}

// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L968-L1015
func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.ResultCode, string) {
	
	// ...
	
	if shouldReplace { // ✅ replace인 경우
		if t.liveObj != nil { // ✅ 이미 리소스가 있을 때
			// ...
			// crd나 ns kind인 경우: kubectl replace 대신 update resource
			if kube.IsCRD(t.targetObj) || t.targetObj.GetKind() == kubeutil.NamespaceKind {
				
				// ...
				
				_, err = sc.resourceOps.UpdateResource(context.TODO(), update, dryRunStrategy)
				
				// ...
				
			} else {
				// ✅ crd, ns kind가 아니면 replace
				message, err = sc.resourceOps.ReplaceResource(context.TODO(), t.targetObj, dryRunStrategy, force)
			}
		} else {
			// ✅ 리소스 없으면 create
			message, err = sc.resourceOps.CreateResource(context.TODO(), t.targetObj, dryRunStrategy, validate)
		}
	} else { // ✅ replace가 아니면 apply
		message, err = sc.resourceOps.ApplyResource(context.TODO(), t.targetObj, dryRunStrategy, force, validate, serverSideApply, sc.serverSideApplyManager, false)
	}
	
	// ...
	
	// ✅ crd인 경우 crd 대기
	if kube.IsCRD(t.targetObj) && !dryRun {
		crdName := t.targetObj.GetName()
		if err = sc.ensureCRDReady(crdName); err != nil {
			sc.log.Error(err, fmt.Sprintf("failed to ensure that CRD %s is ready", crdName))
		}
	}
	return common.ResultCodeSynced, message
}

runTasks() 내부에는 create뿐 아니라 prune과 delete 작업도 함께 들어 있습니다. 이 중 create 경로에서는 processCreateTasks()를 호출해 여러 task를 goroutine으로 병렬 처리하고, 각 task는 applyObject()로 전달됩니다.

applyObject() 내부에서는 다음 기준으로 분기합니다.

  • replace 옵션이 있는지
  • live object가 이미 존재하는지
  • 대상이 CRD 또는 Namespace인지

특히 CRD와 Namespace는 kubectl replace가 자칫 재생성으로 이어질 수 있으므로, 일반 replace 대신 update 경로로 분기해 더 안전하게 처리합니다.


정리

지금까지 흐름을 정리하면, Argo CD에서 application 차이를 실제 리소스 동기화로 연결하는 과정은 다음과 같습니다.

  1. CLI나 API server를 통해 생성된 application 변경이 controller에 전달됩니다.
  2. refresh queue에서 CompareAppState()가 실행되어 원하는 상태와 현재 상태의 차이를 계산합니다.
  3. refresh 처리가 끝나면 application key가 operation queue로 넘어갑니다.
  4. operation queue는 processRequestedAppOperation()을 통해 sync 요청을 실제 실행 단계로 넘깁니다.
  5. SyncAppState()는 비교 결과를 다시 확인하고 대상 클러스터 정보를 가져온 뒤 sync context를 생성합니다.
  6. gitops-engine의 Sync()는 먼저 dry-run으로 검증하고, 이어서 runTasks()로 실제 동기화를 수행합니다.
  7. runTasks()는 prune, delete, create task를 구분하고 create 경로에서는 applyObject()를 통해 리소스를 적용합니다.
  8. applyObject()는 replace, create, apply를 리소스 상태와 종류에 따라 분기하고, CRD는 적용 뒤 준비 상태까지 확인합니다.

Leave a comment