argocd 분석: app controller와 repo server의 상호 작용
argocd에는 repo-server라는 컴포넌트가 존재합니다. 이 컴포넌트는 manifest가 저장된 원격 저장소를 관리하는 서비스로, k8s manifest를 생성하고 반환합니다. 이전 아티클에서 argocd가 gitops를 구현하기 위해 CompareAppState라는 메서드를 호출했는데, 이 함수 안에는 repo-server로 요청을 보내는 부분이 있습니다. 이번에는 repo-server로 요청을 어디서 보내고 어떻게 처리하는지 확인해보겠습니다.
application controller → repo server 호출
application controller에서 repo server로 요청을 보내는 부분을 확인하려면 CompareAppState 함수를 확인해야 합니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/controller/state.go#L429
func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *v1alpha1.AppProject, revisions []string, sources []v1alpha1.ApplicationSource, noCache bool, noRevisionCache bool, localManifests []string, hasMultipleSources bool, rollback bool) (*comparisonResult, error) {
// ...
if len(localManifests) == 0 {
// ...
// ✅ 1. 원격 저장소에서 대상 object 조회
targetObjs, manifestInfos, revisionUpdated, err = m.GetRepoObjs(app, sources, appLabelKey, revisions, noCache, noRevisionCache, verifySignature, project, rollback)
// ...
}
이 함수에는 GetRepoObjs 메서드를 호출하는 부분이 있습니다. 메서드 내부 구현의 주요 부분을 살펴보면 다음과 같습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/controller/state.go#L127
func (m *appStateManager) GetRepoObjs(app *v1alpha1.Application, sources []v1alpha1.ApplicationSource, appLabelKey string, revisions []string, noCache, noRevisionCache, verifySignature bool, proj *v1alpha1.AppProject, rollback bool) ([]*unstructured.Unstructured, []*apiclient.ManifestResponse, bool, error) {
// ...
// ✅ repoClientSet으로부터 grpc client 생성
conn, repoClient, err := m.repoClientset.NewRepoServerClient()
// ..
for i, source := range sources {
// ...
// ✅ repoClient에 GenerateManifest rpc호출
manifestInfo, err := repoClient.GenerateManifest(context.Background(), &apiclient.ManifestRequest{
// ...
})
// ...
// manifest 구성
}
// ...
}
여기서 NewRepoServerClient를 보면 gRPC 클라이언트를 생성하는 부분을 확인할 수 있습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/apiclient/clientset.go#L49
func (c *clientSet) NewRepoServerClient() (io.Closer, RepoServerServiceClient, error) {
conn, err := NewConnection(c.address, c.timeoutSeconds, &c.tlsConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to open a new connection to repo server: %w", err)
}
return conn, NewRepoServerServiceClient(conn), nil
}
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/apiclient/clientset.go#L57
// 함수 개형만 확인: grpc 클라이언트 연결 생성
func NewConnection(address string, timeoutSeconds int, tlsConfig *TLSConfiguration) (*grpc.ClientConn, error) { ... }
함수 내부에서 grpc 연결과 repo server 클라이언트를 생성하는 것을 볼 수 있습니다. 따라서 NewRepoServerClient 메서드를 호출하여 grpc 클라이언트를 만들고 GenerateManifest rpc를 호출합니다. 그럼 GenerateManifest가 어디서 구현되는지 찾아보겠습니다.
repo server RPC 호출
repo server도 CLI부터 찾아갈 수 있지만, 여기서는 RPC 구현에만 집중하겠습니다. GenerateManifest에 대한 세부 구현은 다음과 같습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L515
func (s *Service) GenerateManifest(ctx context.Context, q *apiclient.ManifestRequest) (*apiclient.ManifestResponse, error) {
// ...
var promise *ManifestResponsePromise
// ✅ 비동기 실행 함수
operation := func(repoRoot, commitSHA, cacheKey string, ctxSrc operationContextSrc) error {
// ...
// ✅ 주요 함수: manifest 생성 호출
// promise capture
promise = s.runManifestGen(ctx, repoRoot, commitSHA, cacheKey, ctxSrc, q)
// The fist channel to send the message will resume this operation.
// The main purpose for using channels here is to be able to unlock
// the repository as soon as the lock in not required anymore. In
// case of CMP the repo is compressed (tgz) and sent to the cmp-server
// for manifest generation.
select {
case err := <-promise.errCh:
return err
case resp := <-promise.responseCh:
res = resp
case tarDone := <-promise.tarDoneCh:
tarConcluded = tarDone
}
return nil
}
// ...
// ✅ 실제 operation은 runRepoOperation 내부에서 실행
err = s.runRepoOperation(ctx, q.Revision, q.Repo, q.ApplicationSource, q.VerifySignature, cacheFn, operation, settings, q.HasMultipleSources, q.RefSources)
// ...
return res, err
}
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L286
func (s *Service) runRepoOperation(
// ...
) error {
// ...
// ✅ helm 분기나 helm이 아닌 분기나 operation을 함수 내부에서 호출
// 다만 차이점은 operation에 전달하는 ctxSrc인자가 다름
if source.IsHelm() {
// ...
return operation(chartPath, revision, revision, func() (*operationContext, error) {
return &operationContext{chartPath, ""}, nil
})
} else {
// ...
// Here commitSHA refers to the SHA of the actual commit, whereas revision refers to the branch/tag name etc
// We use the commitSHA to generate manifests and store them in cache, and revision to retrieve them from cache
return operation(gitClient.Root(), commitSHA, revision, func() (*operationContext, error) {
var signature string
if verifyCommit {
// When the revision is an annotated tag, we need to pass the unresolved revision (i.e. the tag name)
// to the verification routine. For everything else, we work with the SHA that the target revision is
// pointing to (i.e. the resolved revision).
var rev string
if gitClient.IsAnnotatedTag(revision) {
rev = unresolvedRevision
} else {
rev = revision
}
signature, err = gitClient.VerifyCommitSignature(rev)
if err != nil {
return nil, err
}
}
appPath, err := argopath.Path(gitClient.Root(), source.Path)
if err != nil {
return nil, err
}
return &operationContext{appPath, signature}, nil
})
}
}
GenerateManifest 함수 내부에서는 클로저로 operation을 선언하고, 이를 runRepoOperation에 전달하여 실행합니다. runRepoOperation은 내부에서 operation을 직접 호출합니다. 결국 핵심은 operation 내부에서 실행되는 로직입니다. 여기서 runManifestGen 함수의 동작이 operation의 핵심이며, 이 함수에서 manifest를 생성합니다.
그럼 runManifestGen 함수를 확인해보겠습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L677
func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest) *ManifestResponsePromise {
responseCh := make(chan *apiclient.ManifestResponse)
tarDoneCh := make(chan bool)
errCh := make(chan error)
responsePromise := NewManifestResponsePromise(responseCh, tarDoneCh, errCh)
// ✅ 채널 생성 후 고루틴으로 runManifestGenAsync 호출
channels := &generateManifestCh{
responseCh: responseCh,
tarDoneCh: tarDoneCh,
errCh: errCh,
}
go s.runManifestGenAsync(ctx, repoRoot, commitSHA, cacheKey, opContextSrc, q, channels)
return responsePromise
}
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L701
func (s *Service) runManifestGenAsync(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest, ch *generateManifestCh) {
// ...
if err == nil {
// 분석 용이를 위해 multiple source가 아닌 경우를 가정
if q.HasMultipleSources {
// ...
}
// ...
// ✅ manifestGenResult 생성
manifestGenResult, err = GenerateManifests(ctx, opContext.appPath, repoRoot, commitSHA, q, false, s.gitCredsStore, s.initConstants.MaxCombinedDirectoryManifestsSize, s.gitRepoPaths, WithCMPTarDoneChannel(ch.tarDoneCh), WithCMPTarExcludedGlobs(s.initConstants.CMPTarExcludedGlobs))
}
// ...
// ✅ cache에 manifest 업데이트
err = s.cache.SetManifests(cacheKey, appSourceCopy, q.RefSources, q, q.Namespace, q.TrackingMethod, q.AppLabelKey, q.AppName, &manifestGenCacheEntry, refSourceCommitSHAs, q.InstallationID)
// ...
// ✅ 동작 결과 responseCh로 주입
ch.responseCh <- manifestGenCacheEntry.ManifestResponse
}
runManifestGen은 runManifestGenAsync를 고루틴으로 호출하고 runManifestGenAsync는 다시 GenerateManifests를 호출하여 결과를 캐시에 저장하고 채널로 리턴합니다.
그럼 실제로 manifest를 구성하는 과정은 GenerateManifests에서 발생하는 것을 알 수 있습니다.
// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L1410
func GenerateManifests(ctx context.Context, appPath, repoRoot, revision string, q *apiclient.ManifestRequest, isLocal bool, gitCredsStore git.CredsStore, maxCombinedManifestQuantity resource.Quantity, gitRepoPaths io.TempPaths, opts ...GenerateManifestOpt) (*apiclient.ManifestResponse, error) {
opt := newGenerateManifestOpt(opts...)
var targetObjs []*unstructured.Unstructured
// ...
// ✅ app source type 조회
appSourceType, err := GetAppSourceType(ctx, q.ApplicationSource, appPath, repoRoot, q.AppName, q.EnabledSourceTypes, opt.cmpTarExcludedGlobs, env.Environ())
// ...
var commands []string
switch appSourceType {
// ✅ helm인 경우
case v1alpha1.ApplicationSourceTypeHelm:
var command string
// ✅ helm template으로부터 targetObjs 생성
targetObjs, command, err = helmTemplate(appPath, repoRoot, env, q, isLocal, gitRepoPaths)
commands = append(commands, command)
// ✅ kustomize인 경우
case v1alpha1.ApplicationSourceTypeKustomize:
// ✅ kustomize binary 등록
kustomizeBinary := ""
if q.KustomizeOptions != nil {
kustomizeBinary = q.KustomizeOptions.BinaryPath
}
// ✅ kustomize를 통해 targetObjs 생성
k := kustomize.NewKustomizeApp(repoRoot, appPath, q.Repo.GetGitCreds(gitCredsStore), repoURL, kustomizeBinary, q.Repo.Proxy, q.Repo.NoProxy)
targetObjs, _, commands, err = k.Build(q.ApplicationSource.Kustomize, q.KustomizeOptions, env, &kustomize.BuildOpts{
KubeVersion: text.SemVer(q.ApplicationSource.GetKubeVersionOrDefault(q.KubeVersion)),
APIVersions: q.ApplicationSource.GetAPIVersionsOrDefault(q.ApiVersions),
})
// ✅ plugin인 경우
case v1alpha1.ApplicationSourceTypePlugin:
// ✅ plugin 등록
pluginName := ""
if q.ApplicationSource.Plugin != nil {
pluginName = q.ApplicationSource.Plugin.Name
}
// if pluginName is provided it has to be `<metadata.name>-<spec.version>` or just `<metadata.name>` if plugin version is empty
// ✅ plugin sidecar로 실행하여 targetObjs 생성
targetObjs, err = runConfigManagementPluginSidecars(ctx, appPath, repoRoot, pluginName, env, q, opt.cmpTarDoneCh, opt.cmpTarExcludedGlobs)
if err != nil {
err = fmt.Errorf("plugin sidecar failed. %s", err.Error())
}
case v1alpha1.ApplicationSourceTypeDirectory:
// ✅ binary인 경우 findManifests로 targetObjs 생성
var directory *v1alpha1.ApplicationSourceDirectory
if directory = q.ApplicationSource.Directory; directory == nil {
directory = &v1alpha1.ApplicationSourceDirectory{}
}
logCtx := log.WithField("application", q.AppName)
targetObjs, err = findManifests(logCtx, appPath, repoRoot, env, *directory, q.EnabledSourceTypes, maxCombinedManifestQuantity)
}
// ...
manifests := make([]string, 0)
for _, obj := range targetObjs {
// ✅ targetObjs를 순회하면서 리소스 타입을 json string으로 변경
}
return &apiclient.ManifestResponse{
Manifests: manifests,
SourceType: string(appSourceType),
Commands: commands,
}, nil
}
해당 함수 내부에서는 source 타입에 따라 적절한 방식으로 manifest를 생성합니다. GenerateManifests에서 manifest가 반환되면 promise의 responseCh에 결과가 등록됩니다. 최종적으로 promise에서 select로 대기하는 operation 클로저가 응답을 수신하고 grpc 요청이 종료됩니다.
참고자료
argocd-repo-server Command Reference - Argo CD - Declarative GitOps CD for Kubernetes
Leave a comment