argocd 분석: argocd가 다른 클러스터를 모니터링하는 방법: LiveStateCache
저번 글에서는 gitops-engine 내부에서 대상 클러스터에 배포된 애플리케이션 상태를 어떻게 모니터링하는지 살펴봤습니다. ClusterCache가 대상 클러스터의 오브젝트 타입을 디스커버리하고, 리소스 변화가 발생했을 때 이벤트를 발생시키는 방식으로 동작한다는 점을 확인했습니다. 이번 글에서는 그다음 단계로, liveStateCache가 ClusterCache를 어떻게 생성하고 동기화하는지, 그리고 감지한 이벤트를 ApplicationController까지 어떻게 전달하는지 순서대로 살펴보겠습니다.
LiveStateCache에서 클러스터 조회하기
argocd는 LiveStateCache라는 인터페이스를 통해 클러스터 정보를 조회합니다. 실제 구현체는 liveStateCache이며, 인터페이스와 구조체를 함께 보면 역할이 더 분명하게 보입니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L119-L142
type LiveStateCache interface {
// Returns k8s server version
GetVersionsInfo(serverURL string) (string, []kube.APIResourceInfo, error)
// Returns true of given group kind is a namespaced resource
IsNamespaced(server string, gk schema.GroupKind) (bool, error)
// Returns synced cluster cache
GetClusterCache(server string) (clustercache.ClusterCache, error)
// Executes give callback against resource specified by the key and all its children
IterateHierarchy(server string, key kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
// Executes give callback against resources specified by the keys and all its children
IterateHierarchyV2(server string, keys []kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
// Returns state of live nodes which correspond for target nodes of specified application.
GetManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error)
// IterateResources iterates all resource stored in cache
IterateResources(server string, callback func(res *clustercache.Resource, info *ResourceInfo)) error
// Returns all top level resources (resources without owner references) of a specified namespace
GetNamespaceTopLevelResources(server string, namespace string) (map[kube.ResourceKey]appv1.ResourceNode, error)
// Starts watching resources of each controlled cluster.
Run(ctx context.Context) error
// Returns information about monitored clusters
GetClustersInfo() []clustercache.ClusterInfo
// Init must be executed before cache can be used
Init() error
}
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L208-L222
type liveStateCache struct {
db db.ArgoDB
appInformer cache.SharedIndexInformer
onObjectUpdated ObjectUpdatedHandler
kubectl kube.Kubectl
settingsMgr *settings.SettingsManager
metricsServer *metrics.MetricsServer
clusterSharding sharding.ClusterShardingCache
resourceTracking argo.ResourceTracking
ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts
clusters map[string]clustercache.ClusterCache
cacheSettings cacheSettings
lock sync.RWMutex
}
여기서 핵심은 liveStateCache가 clusters 필드에, 이전 글에서 살펴본 gitops-engine의 clustercache.ClusterCache를 맵 형태로 보관한다는 점입니다.
liveStateCache에는 getCluster 메서드가 있고, 이 메서드가 ClusterCache를 생성한 뒤 clusters 필드에 저장합니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L457-L610
func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, error) {
// ...
clusterCache, ok = c.clusters[server]
if ok {
return clusterCache, nil
}
// cluster config & 옵션 생성
clusterCacheConfig, err := cluster.RESTConfig()
clusterCacheOpts := []clustercache.UpdateSettingsFunc{ // ...
}
// ✅ clusterCache 생성
clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...)
// ✅ 리소스 업데이트 함수 등록
_ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) { // ...
}
// ✅ 업데이트 함수 등록
_ = clusterCache.OnEvent(func(event watch.EventType, un *unstructured.Unstructured) { // ...
}
// ...
// ✅ 클러스터 등록
c.clusters[server] = clusterCache
return clusterCache, nil
}
메서드의 역할을 정리하면 다음과 같습니다. 먼저 server 값을 기준으로 clusters 맵을 조회하고, 이미 존재하면 바로 반환합니다. 아직 없으면 새로운 ClusterCache를 만들고 gitops-engine의 ClusterCache에 이벤트 핸들러를 등록한 뒤 캐시에 저장합니다.
getCluster는 private 메서드이므로 외부에서 직접 호출하지 않습니다. 대신 getSyncedCluster가 이를 감싼 뒤 EnsureSynced까지 수행합니다. 코드를 보면 다음과 같습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L613-L623
func (c *liveStateCache) getSyncedCluster(server string) (clustercache.ClusterCache, error) {
clusterCache, err := c.getCluster(server)
if err != nil {
return nil, fmt.Errorf("error getting cluster: %w", err)
}
err = clusterCache.EnsureSynced()
if err != nil {
return nil, fmt.Errorf("error synchronizing cache state : %w", err)
}
return clusterCache, nil
}
getSyncedCluster는 ClusterCache를 가져온 뒤 EnsureSynced를 호출합니다. 따라서 이 함수가 호출되는 시점에 argocd는 대상 클러스터의 최신 상태를 확보하게 됩니다. 이 메서드는 liveStateCache의 여러 public 메서드에서 공통으로 사용됩니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L638-L710
func (c *liveStateCache) IsNamespaced(server string, gk schema.GroupKind) (bool, error)
func (c *liveStateCache) IterateHierarchy(server string, key kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
func (c *liveStateCache) IterateHierarchyV2(server string, keys []kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
func (c *liveStateCache) IterateResources(server string, callback func(res *clustercache.Resource, info *ResourceInfo)) error
func (c *liveStateCache) GetNamespaceTopLevelResources(server string, namespace string) (map[kube.ResourceKey]appv1.ResourceNode, error)
func (c *liveStateCache) GetManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error)
func (c *liveStateCache) GetVersionsInfo(serverURL string) (string, []kube.APIResourceInfo, error)
예를 들어 IsNamespaced는 appStateManager의 CompareAppState에서 호출됩니다. 또한 IterateHierarchyV2, GetNamespaceTopLevelResources는 ApplicationController의 getResourceTree에서 호출됩니다. 즉 getSyncedCluster는 여러 경로에서 재사용되지만, 큰 흐름으로 보면 ApplicationController가 클러스터 리소스 상태를 읽어야 하는 시점마다 호출된다고 볼 수 있습니다.
이제 조회 경로를 확인했으니, 이어서 liveStateCache가 ClusterCache에서 전달된 이벤트를 어떻게 처리하는지 살펴보겠습니다.
LiveStateCache에서 이벤트 처리하기
이전 글에서 gitops-engine의 ClusterCache는 Kubernetes 오브젝트의 변화를 Watcher로 감지하고, 외부에서 등록한 핸들러를 통해 상위 레이어로 이벤트를 전달할 수 있다는 점을 확인했습니다. 이번에는 liveStateCache가 그 이벤트를 받아서 어떻게 처리하는지 보겠습니다. 이를 위해 다시 getCluster 메서드를 살펴보겠습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L558-L606
func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, error) {
// ...
_ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) {
toNotify := make(map[string]bool)
var ref v1.ObjectReference // 변경된 리소스 정보 저장
if newRes != nil {
ref = newRes.Ref
} else {
ref = oldRes.Ref
}
c.lock.RLock()
cacheSettings := c.cacheSettings
c.lock.RUnlock()
// 불필요한 업데이트 무시: 이전 / 이후 리소스가 모두 존재할 때 skipResourceUpdate가 참이면 무시
if cacheSettings.ignoreResourceUpdatesEnabled && oldRes != nil && newRes != nil && skipResourceUpdate(resInfo(oldRes), resInfo(newRes)) {
// Additional check for debug level so we don't need to evaluate the
// format string in case of non-debug scenarios
if log.GetLevel() >= log.DebugLevel {
namespace := ref.Namespace
if ref.Namespace == "" {
namespace = "(cluster-scoped)"
}
log.WithFields(log.Fields{
"server": cluster.Server,
"namespace": namespace,
"name": ref.Name,
"api-version": ref.APIVersion,
"kind": ref.Kind,
}).Debug("Ignoring change of object because none of the watched resource fields have changed")
}
return
}
for _, r := range []*clustercache.Resource{newRes, oldRes} {
if r == nil {
continue
}
// ✅ 리소스에 대한 app을 가져옴
app := getApp(r, namespaceResources)
if app == "" || skipAppRequeuing(r.ResourceKey()) {
continue
}
// ✅ toNotify에 application 등록: toNotify는 크기가 거의 1
toNotify[app] = isRootAppNode(r) || toNotify[app]
}
// ✅ liveStateCache의 onObjectUpdated 호출
c.onObjectUpdated(toNotify, ref)
})
// ✅ OnEvent에서는 메트릭 증가만 수행
_ = clusterCache.OnEvent(func(event watch.EventType, un *unstructured.Unstructured) {
gvk := un.GroupVersionKind()
c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind)
})
// ...
}
getCluster 내부에서는 ClusterCache의 OnResourceUpdated와 OnEvent를 호출해 이벤트 핸들러를 등록합니다. OnEvent는 메트릭만 증가시키므로, 여기서는 OnResourceUpdated의 흐름만 따라가겠습니다.
내부 로직은 다음 순서로 진행됩니다.
- 불필요한 리소스 업데이트는 무시
- 변경된 리소스를 관리하는 애플리케이션을 찾음
- toNotify에 애플리케이션을 등록
- liveStateCache의
onObjectUpdated를 호출
즉 핵심은 마지막 단계에서 liveStateCache의 onObjectUpdated가 호출된다는 점입니다.
그렇다면 onObjectUpdated는 어디서 주입되는지 봐야 합니다. 이 부분은 NewLiveStateCache에서 확인할 수 있습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/cache/cache.go#L173-L193
func NewLiveStateCache(
db db.ArgoDB,
appInformer cache.SharedIndexInformer,
settingsMgr *settings.SettingsManager,
kubectl kube.Kubectl,
metricsServer *metrics.MetricsServer,
onObjectUpdated ObjectUpdatedHandler, // ✅ 이 부분에서 핸들러 등록
clusterSharding sharding.ClusterShardingCache,
resourceTracking argo.ResourceTracking,
) LiveStateCache {
return &liveStateCache{
appInformer: appInformer,
db: db,
clusters: make(map[string]clustercache.ClusterCache),
onObjectUpdated: onObjectUpdated, // ✅ 이 부분에서 핸들러 등록
kubectl: kubectl,
settingsMgr: settingsMgr,
metricsServer: metricsServer,
clusterSharding: clusterSharding,
resourceTracking: resourceTracking,
}
}
이제 NewLiveStateCache가 어디서 호출되는지 보면 됩니다. 이 호출은 ApplicationController를 생성하는 코드에서 확인할 수 있습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L152-L298
// NewApplicationController creates new instance of ApplicationController.
func NewApplicationController(
// ...
) (*ApplicationController, error) {
// ...
ctrl := ApplicationController{
// ...
}
// ...
// ✅ NewLiveStateCache에 ctrl.handleObjectUpdated를 등록
stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterSharding, argo.NewResourceTracking())
// ... controller에 stateCache등록
return &ctrl, nil
}
이때 주입되는 handleObjectUpdated를 보면 다음과 같습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06db/controller/appcontroller.go#L397-L455
func (ctrl *ApplicationController) handleObjectUpdated(managedByApp map[string]bool, ref v1.ObjectReference) {
// ✅ managedByApp은 크기가 대부분의 경우에 1
// 만약 이것보다 크다면 하나의 리소스 업데이트가 두 개의 app에 영향을 준다는 의미
// ...
for appName, isManagedResource := range managedByApp {
// The appName is given as <namespace>_<name>, but the indexer needs it
// format <namespace>/<name>
appKey := ctrl.toAppKey(appName)
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
app, ok := obj.(*appv1.Application) // ✅ app 가져오기
if exists && err == nil && ok && isSelfReferencedApp(app, ref) {
// Don't force refresh app if related resource is application itself. This prevents infinite reconciliation loop.
continue
}
// ...
// ✅ refreshQueue에 추가하고 후에 refresh 함수 호출
ctrl.requestAppRefresh(app.QualifiedName(), &level, nil)
}
}
로직을 간단히 요약하면, 변경된 리소스와 그 리소스를 관리하는 애플리케이션 목록을 바탕으로 requestAppRefresh를 호출하고, 이후 큐에서 이를 처리하면서 애플리케이션 상태를 다시 계산합니다.
이제 argocd가 다른 클러스터의 이벤트를 어떻게 감지하고 상위 컨트롤러까지 전달하는지 흐름을 정리할 수 있습니다.
- ApplicationController가 생성될 때 LiveStateCache가 생성되고 이벤트 핸들러가 함께 주입됩니다.
- ApplicationController에서 클러스터 리소스를 조회하면 LiveStateCache 내부에서
getCluster가 호출되고, 클러스터별 최초 호출 시 gitops-engine의 ClusterCache를 생성합니다. - 생성된 ClusterCache의
EnsureSynced를 호출하면 모든 클러스터 타입에 대한 Watcher가 동작하게 되고 이벤트 감시를 시작합니다. - 이벤트가 감지되면 등록된 이벤트 핸들러가 호출되고, 상위 레이어로 이벤트가 전달되면서 ApplicationController의
requestAppRefresh가 실행됩니다.
sequenceDiagram
participant ApplicationController
participant LiveStateCache
participant ClusterCache
participant Watcher
ApplicationController->>LiveStateCache: 생성 시 LiveStateCache 생성 및 이벤트 등록
ApplicationController->>LiveStateCache: 클러스터 리소스 조회
LiveStateCache->>LiveStateCache: getSyncedCluster 호출
LiveStateCache->>LiveStateCache: getCluster 호출
LiveStateCache->>ClusterCache: ClusterCache 생성 (클러스터 별 최초 호출 시)
ClusterCache->>ClusterCache: EnsureSynced 호출
ClusterCache->>Watcher: 모든 클러스터 타입에 대한 Watcher 동작 시작
Watcher-->>ClusterCache: 이벤트 감지
ClusterCache-->>LiveStateCache: 등록된 이벤트 핸들러 호출
LiveStateCache-->>ApplicationController: requestAppRefresh 호출
Leave a comment