argocd 분석: app controller의 k8s 리소스 sync
다시 application controller로 돌아와서 CompareAppState()를 살펴보겠습니다. 이 함수는 원격 저장소와 대상 클러스터 사이의 차이를 계산해 어떤 리소스를 다시 맞춰야 하는지 정하는 역할을 했습니다. 이번에는 그 결과를 바탕으로 application controller가 실제 Kubernetes 리소스를 동기화하는 과정을 따라가 보겠습니다.
refresh queue에서 operation queue로
지금까지는 application 상태를 가져와 원격 저장소와 비교하고, sync 대상이 무엇인지 계산하는 흐름까지 확인했습니다. 이제는 그 결과를 바탕으로 실제 Kubernetes 리소스가 어떻게 동기화되는지 살펴보겠습니다.
다시 processAppRefreshQueueItem()을 보면 함수가 끝날 때마다 실행되는 defer 절이 있습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L1541-L1557
func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
// ...
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
// We want to have app operation update happen after the sync, so there's no race condition
// and app updates not proceeding. See https://github.com/argoproj/argo-cd/issues/18500.
// ✅ 이 부분에서 refresh queue -> operation queue로 넘어감
ctrl.appOperationQueue.AddRateLimited(appKey)
ctrl.appRefreshQueue.Done(appKey)
}()
// ...
processAppRefreshQueueItem()이 끝나면 위 defer 절이 실행되고, 이때 ctrl.appOperationQueue.AddRateLimited(appKey)가 호출되면서 작업이 refresh queue에서 operation queue로 넘어갑니다. 즉 refresh 단계에서 비교를 마친 application key가 다음 처리 단계로 전달됩니다. 이 key를 실제로 소비하는 함수는 processAppOperationQueueItem()입니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L934-L988
func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext bool) {
// ✅ queue data 가져오기
appKey, shutdown := ctrl.appOperationQueue.Get()
// ...
// ✅ informer에서 object 조회
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
// ...
// ✅ operation이 nil이 아닐 때
if app.Operation != nil {
ctrl.processRequestedAppOperation(app)
ts.AddCheckpoint("process_requested_app_operation_ms")
}
// ...
}
operation queue에서는 appKey로 application 오브젝트를 다시 조회합니다. 그리고 app.Operation이 남아 있으면 processRequestedAppOperation()으로 넘겨 실제 동기화 단계를 진행합니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L1317-L1382
func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
// ...
if isOperationInProgress(app) {
// 이미 처리중인 경우 종료가 아니면 retry 로직
} else {
// app phase를 operation running으로 변경
state = &appv1.OperationState{Phase: synccommon.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
ctrl.setOperationState(app, state)
// ...
}
// 목적지 검증 후
if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
// ...
// ✅ 에러가 없는 경우 SyncAppState 호출
} else {
ctrl.appStateManager.SyncAppState(app, state)
}
// ...
// app state 재검사
}
이 함수의 큰 흐름은 다음과 같습니다.
- operation state 초기화 또는 재시도 상태 복구
- 대상 클러스터 검증
SyncAppState()호출- sync 이후 application 상태 후처리
즉 실제 Kubernetes 리소스를 맞추는 핵심 진입점은 SyncAppState()입니다.
SyncAppState에서 sync context 생성
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/sync.go#L90-L227
func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState) {
// ...
// ✅ CompareAppState 재호출
compareResult, err := m.CompareAppState(app, proj, revisions, sources, false, true, syncOp.Manifests, isMultiSourceRevision, rollback)
// ...
// ✅ 클러스터 조회
clst, err := m.db.GetCluster(context.Background(), app.Spec.Destination.Server)
// ...
}
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/sync.go#L370-L393
func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha1.OperationState) {
// ...
// ✅ syncCtx 생성
syncCtx, cleanup, err := sync.NewSyncContext(
// ...
)
// ...
if state.Phase == common.OperationTerminating {
syncCtx.Terminate()
} else {
// ✅ terminating이 아니면 Sync 진행
syncCtx.Sync()
}
// ...
}
SyncAppState() 내부에서는 크게 다음 순서가 실행됩니다.
CompareAppState()를 한 번 더 호출해 비교 결과를 다시 계산- 클러스터 조회
sync.NewSyncContext()로 sync context 생성syncCtx.Sync()호출
여기서 실제 동기화를 수행하는 주체는 syncCtx.Sync()입니다. sync.NewSyncContext()는 Argo CD 저장소 내부 구현이 아니라 gitops-engine 패키지에서 제공하는 코드입니다.
gitops-engine은 Argo CD와 Flux CD가 함께 사용하던 GitOps 엔진입니다. 자세한 배경은 gitops-engine FAQ에서 확인할 수 있습니다.
gitops-engine의 Sync 실행
gitops-engine의 Sync() 메서드는 다음과 같이 동작합니다.
// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L394-L533
func (sc *syncContext) Sync() {
// ...
if sc.started() {
// ...
} else {
// ...
// ✅ dry run으로 사전 동작 체크
dryRunTasks := tasks
if sc.applyOutOfSyncOnly {
dryRunTasks = sc.filterOutOfSyncTasks(tasks)
}
sc.log.WithValues("tasks", dryRunTasks).Info("Tasks (dry-run)")
if sc.runTasks(dryRunTasks, true) == failed {
sc.setOperationPhase(common.OperationFailed, "one or more objects failed to apply (dry run)")
return
}
}
// ...
// sync 시 hook & phase 설정
// ...
// ✅ 실제 kubernetes apply 실행
runState := sc.runTasks(tasks, false)
// runState에 대한 상태 분기 처리
}
Sync()는 먼저 dry-run으로 리소스 검증을 수행하고, 문제가 없을 때 실제 runTasks()를 호출합니다. 실제 리소스 적용 경로는 runTasks()에서 이어집니다. 여기서는 생성 관련 흐름만 보겠습니다.
// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L1155-L1262
func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
// ...
// tasks에 대한 분리 작업
for _, task := range tasks {
if task.isPrune() {
pruneTasks = append(pruneTasks, task)
} else {
createTasks = append(createTasks, task)
}
}
// prune 처리
// ...
// delete 처리
// ...
var tasksGroup syncTasks
// ...
if len(tasksGroup) > 0 {
// ✅ 리소스 생성 처리
state = sc.processCreateTasks(state, tasksGroup, dryRun)
}
return state
}
// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L1265-L1285
func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRun bool) runState {
ss := newStateSync(state)
for _, task := range tasks {
// dryRun인 경우 스킵
if dryRun && task.skipDryRun {
continue
}
t := task
ss.Go(func(state runState) runState {
// ...
// ✅ 고루틴으로 applyObject로 리소스 생성 호출
result, message := sc.applyObject(t, dryRun, validate)
// ...
return state
})
}
return ss.Wait()
}
// https://github.com/argoproj/gitops-engine/blob/54992bf42431e71f71f11647e82105530e56305e/pkg/sync/sync_context.go#L968-L1015
func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.ResultCode, string) {
// ...
if shouldReplace { // ✅ replace인 경우
if t.liveObj != nil { // ✅ 이미 리소스가 있을 때
// ...
// crd나 ns kind인 경우: kubectl replace 대신 update resource
if kube.IsCRD(t.targetObj) || t.targetObj.GetKind() == kubeutil.NamespaceKind {
// ...
_, err = sc.resourceOps.UpdateResource(context.TODO(), update, dryRunStrategy)
// ...
} else {
// ✅ crd, ns kind가 아니면 replace
message, err = sc.resourceOps.ReplaceResource(context.TODO(), t.targetObj, dryRunStrategy, force)
}
} else {
// ✅ 리소스 없으면 create
message, err = sc.resourceOps.CreateResource(context.TODO(), t.targetObj, dryRunStrategy, validate)
}
} else { // ✅ replace가 아니면 apply
message, err = sc.resourceOps.ApplyResource(context.TODO(), t.targetObj, dryRunStrategy, force, validate, serverSideApply, sc.serverSideApplyManager, false)
}
// ...
// ✅ crd인 경우 crd 대기
if kube.IsCRD(t.targetObj) && !dryRun {
crdName := t.targetObj.GetName()
if err = sc.ensureCRDReady(crdName); err != nil {
sc.log.Error(err, fmt.Sprintf("failed to ensure that CRD %s is ready", crdName))
}
}
return common.ResultCodeSynced, message
}
runTasks() 내부에는 create뿐 아니라 prune과 delete 작업도 함께 들어 있습니다. 이 중 create 경로에서는 processCreateTasks()를 호출해 여러 task를 goroutine으로 병렬 처리하고, 각 task는 applyObject()로 전달됩니다.
applyObject() 내부에서는 다음 기준으로 분기합니다.
- replace 옵션이 있는지
- live object가 이미 존재하는지
- 대상이 CRD 또는 Namespace인지
특히 CRD와 Namespace는 kubectl replace가 자칫 재생성으로 이어질 수 있으므로, 일반 replace 대신 update 경로로 분기해 더 안전하게 처리합니다.
정리
지금까지 흐름을 정리하면, Argo CD에서 application 차이를 실제 리소스 동기화로 연결하는 과정은 다음과 같습니다.
- CLI나 API server를 통해 생성된 application 변경이 controller에 전달됩니다.
- refresh queue에서
CompareAppState()가 실행되어 원하는 상태와 현재 상태의 차이를 계산합니다. - refresh 처리가 끝나면 application key가 operation queue로 넘어갑니다.
- operation queue는
processRequestedAppOperation()을 통해 sync 요청을 실제 실행 단계로 넘깁니다. SyncAppState()는 비교 결과를 다시 확인하고 대상 클러스터 정보를 가져온 뒤 sync context를 생성합니다.- gitops-engine의
Sync()는 먼저 dry-run으로 검증하고, 이어서runTasks()로 실제 동기화를 수행합니다. runTasks()는 prune, delete, create task를 구분하고 create 경로에서는applyObject()를 통해 리소스를 적용합니다.applyObject()는 replace, create, apply를 리소스 상태와 종류에 따라 분기하고, CRD는 적용 뒤 준비 상태까지 확인합니다.
Leave a comment