6 minute read

argocd에는 repo-server라는 컴포넌트가 존재합니다. 이 컴포넌트는 manifest가 저장된 원격 저장소를 관리하는 서비스로, k8s manifest를 생성하고 반환합니다. 이전 아티클에서 argocd가 gitops를 구현하기 위해 CompareAppState라는 메서드를 호출했는데, 이 함수 안에는 repo-server로 요청을 보내는 부분이 있습니다. 이번에는 repo-server로 요청을 어디서 보내고 어떻게 처리하는지 확인해보겠습니다.


application controller → repo server 호출

application controller에서 repo server로 요청을 보내는 부분을 확인하려면 CompareAppState 함수를 확인해야 합니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/controller/state.go#L429
func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *v1alpha1.AppProject, revisions []string, sources []v1alpha1.ApplicationSource, noCache bool, noRevisionCache bool, localManifests []string, hasMultipleSources bool, rollback bool) (*comparisonResult, error) {

	// ...

	
	if len(localManifests) == 0 {

		// ...
		
		// ✅ 1. 원격 저장소에서 대상 object 조회
		targetObjs, manifestInfos, revisionUpdated, err = m.GetRepoObjs(app, sources, appLabelKey, revisions, noCache, noRevisionCache, verifySignature, project, rollback)
		
		// ...

	}

이 함수에는 GetRepoObjs 메서드를 호출하는 부분이 있습니다. 메서드 내부 구현의 주요 부분을 살펴보면 다음과 같습니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/controller/state.go#L127
func (m *appStateManager) GetRepoObjs(app *v1alpha1.Application, sources []v1alpha1.ApplicationSource, appLabelKey string, revisions []string, noCache, noRevisionCache, verifySignature bool, proj *v1alpha1.AppProject, rollback bool) ([]*unstructured.Unstructured, []*apiclient.ManifestResponse, bool, error) {

	// ...

	// ✅ repoClientSet으로부터 grpc client 생성
	conn, repoClient, err := m.repoClientset.NewRepoServerClient()
	
	// ..
	
	for i, source := range sources {
		
		// ...
		
		// ✅ repoClient에 GenerateManifest rpc호출
		manifestInfo, err := repoClient.GenerateManifest(context.Background(), &apiclient.ManifestRequest{
			// ...
		})
		
		// ...
		// manifest 구성

	}
	
	// ...
	
}

여기서 NewRepoServerClient를 보면 gRPC 클라이언트를 생성하는 부분을 확인할 수 있습니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/apiclient/clientset.go#L49
func (c *clientSet) NewRepoServerClient() (io.Closer, RepoServerServiceClient, error) {
	conn, err := NewConnection(c.address, c.timeoutSeconds, &c.tlsConfig)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to open a new connection to repo server: %w", err)
	}
	return conn, NewRepoServerServiceClient(conn), nil
}

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/apiclient/clientset.go#L57
// 함수 개형만 확인: grpc 클라이언트 연결 생성
func NewConnection(address string, timeoutSeconds int, tlsConfig *TLSConfiguration) (*grpc.ClientConn, error) { ... }

함수 내부에서 grpc 연결과 repo server 클라이언트를 생성하는 것을 볼 수 있습니다. 따라서 NewRepoServerClient 메서드를 호출하여 grpc 클라이언트를 만들고 GenerateManifest rpc를 호출합니다. 그럼 GenerateManifest가 어디서 구현되는지 찾아보겠습니다.


repo server RPC 호출

repo server도 CLI부터 찾아갈 수 있지만, 여기서는 RPC 구현에만 집중하겠습니다. GenerateManifest에 대한 세부 구현은 다음과 같습니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L515
func (s *Service) GenerateManifest(ctx context.Context, q *apiclient.ManifestRequest) (*apiclient.ManifestResponse, error) {

	// ...
	
	var promise *ManifestResponsePromise
	
	// ✅ 비동기 실행 함수
	operation := func(repoRoot, commitSHA, cacheKey string, ctxSrc operationContextSrc) error {
		
		// ...

		// ✅ 주요 함수: manifest 생성 호출
		// promise capture
		promise = s.runManifestGen(ctx, repoRoot, commitSHA, cacheKey, ctxSrc, q)
		// The fist channel to send the message will resume this operation.
		// The main purpose for using channels here is to be able to unlock
		// the repository as soon as the lock in not required anymore. In
		// case of CMP the repo is compressed (tgz) and sent to the cmp-server
		// for manifest generation.
		select {
		case err := <-promise.errCh:
			return err
		case resp := <-promise.responseCh:
			res = resp
		case tarDone := <-promise.tarDoneCh:
			tarConcluded = tarDone
		}
		return nil
	}

	// ...
	
	// ✅ 실제 operation은 runRepoOperation 내부에서 실행
	err = s.runRepoOperation(ctx, q.Revision, q.Repo, q.ApplicationSource, q.VerifySignature, cacheFn, operation, settings, q.HasMultipleSources, q.RefSources)

	// ...

	return res, err
}

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L286
func (s *Service) runRepoOperation(
	// ...
) error {

// ...

	// ✅ helm 분기나 helm이 아닌 분기나 operation을 함수 내부에서 호출
	// 다만 차이점은 operation에 전달하는 ctxSrc인자가 다름 
	if source.IsHelm() {
		
		// ...
	
		return operation(chartPath, revision, revision, func() (*operationContext, error) {
			return &operationContext{chartPath, ""}, nil
		})
	} else {
		
		// ...
		
		// Here commitSHA refers to the SHA of the actual commit, whereas revision refers to the branch/tag name etc
		// We use the commitSHA to generate manifests and store them in cache, and revision to retrieve them from cache
		return operation(gitClient.Root(), commitSHA, revision, func() (*operationContext, error) {
			var signature string
			if verifyCommit {
				// When the revision is an annotated tag, we need to pass the unresolved revision (i.e. the tag name)
				// to the verification routine. For everything else, we work with the SHA that the target revision is
				// pointing to (i.e. the resolved revision).
				var rev string
				if gitClient.IsAnnotatedTag(revision) {
					rev = unresolvedRevision
				} else {
					rev = revision
				}
				signature, err = gitClient.VerifyCommitSignature(rev)
				if err != nil {
					return nil, err
				}
			}
			appPath, err := argopath.Path(gitClient.Root(), source.Path)
			if err != nil {
				return nil, err
			}
			return &operationContext{appPath, signature}, nil
		})
	}
}

GenerateManifest 함수 내부에서는 클로저로 operation을 선언하고, 이를 runRepoOperation에 전달하여 실행합니다. runRepoOperation은 내부에서 operation을 직접 호출합니다. 결국 핵심은 operation 내부에서 실행되는 로직입니다. 여기서 runManifestGen 함수의 동작이 operation의 핵심이며, 이 함수에서 manifest를 생성합니다.

그럼 runManifestGen 함수를 확인해보겠습니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L677
func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest) *ManifestResponsePromise {
	responseCh := make(chan *apiclient.ManifestResponse)
	tarDoneCh := make(chan bool)
	errCh := make(chan error)
	responsePromise := NewManifestResponsePromise(responseCh, tarDoneCh, errCh)

	// ✅ 채널 생성 후 고루틴으로 runManifestGenAsync 호출
	channels := &generateManifestCh{
		responseCh: responseCh,
		tarDoneCh:  tarDoneCh,
		errCh:      errCh,
	}
	go s.runManifestGenAsync(ctx, repoRoot, commitSHA, cacheKey, opContextSrc, q, channels)
	return responsePromise
}

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L701
func (s *Service) runManifestGenAsync(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest, ch *generateManifestCh) {
	
	// ...
	
	if err == nil {

		// 분석 용이를 위해 multiple source가 아닌 경우를 가정
		if q.HasMultipleSources {
			// ...
		}
	
		// ...
	
		// ✅ manifestGenResult 생성
		manifestGenResult, err = GenerateManifests(ctx, opContext.appPath, repoRoot, commitSHA, q, false, s.gitCredsStore, s.initConstants.MaxCombinedDirectoryManifestsSize, s.gitRepoPaths, WithCMPTarDoneChannel(ch.tarDoneCh), WithCMPTarExcludedGlobs(s.initConstants.CMPTarExcludedGlobs))
	}
	
	// ...
	
	// ✅ cache에 manifest 업데이트
	err = s.cache.SetManifests(cacheKey, appSourceCopy, q.RefSources, q, q.Namespace, q.TrackingMethod, q.AppLabelKey, q.AppName, &manifestGenCacheEntry, refSourceCommitSHAs, q.InstallationID)
	
	// ...
	
	// ✅ 동작 결과 responseCh로 주입
	ch.responseCh <- manifestGenCacheEntry.ManifestResponse
}

runManifestGenrunManifestGenAsync를 고루틴으로 호출하고 runManifestGenAsync는 다시 GenerateManifests를 호출하여 결과를 캐시에 저장하고 채널로 리턴합니다.

그럼 실제로 manifest를 구성하는 과정은 GenerateManifests에서 발생하는 것을 알 수 있습니다.

// https://github.com/argoproj/argo-cd/blob/a70b2293a06be06dbb5fb30d0925331e72a6de14/reposerver/repository/repository.go#L1410
func GenerateManifests(ctx context.Context, appPath, repoRoot, revision string, q *apiclient.ManifestRequest, isLocal bool, gitCredsStore git.CredsStore, maxCombinedManifestQuantity resource.Quantity, gitRepoPaths io.TempPaths, opts ...GenerateManifestOpt) (*apiclient.ManifestResponse, error) {
	opt := newGenerateManifestOpt(opts...)
	var targetObjs []*unstructured.Unstructured

	// ...

	// ✅ app source type 조회
	appSourceType, err := GetAppSourceType(ctx, q.ApplicationSource, appPath, repoRoot, q.AppName, q.EnabledSourceTypes, opt.cmpTarExcludedGlobs, env.Environ())
	
	// ...
	
	var commands []string
	
	switch appSourceType {
	// ✅ helm인 경우
	case v1alpha1.ApplicationSourceTypeHelm:
		var command string
		// ✅ helm template으로부터 targetObjs 생성
		targetObjs, command, err = helmTemplate(appPath, repoRoot, env, q, isLocal, gitRepoPaths)
		commands = append(commands, command)
	// ✅ kustomize인 경우
	case v1alpha1.ApplicationSourceTypeKustomize:
		// ✅ kustomize binary 등록
		kustomizeBinary := ""
		if q.KustomizeOptions != nil {
			kustomizeBinary = q.KustomizeOptions.BinaryPath
		}
		// ✅ kustomize를 통해 targetObjs 생성
		k := kustomize.NewKustomizeApp(repoRoot, appPath, q.Repo.GetGitCreds(gitCredsStore), repoURL, kustomizeBinary, q.Repo.Proxy, q.Repo.NoProxy)
		targetObjs, _, commands, err = k.Build(q.ApplicationSource.Kustomize, q.KustomizeOptions, env, &kustomize.BuildOpts{
			KubeVersion: text.SemVer(q.ApplicationSource.GetKubeVersionOrDefault(q.KubeVersion)),
			APIVersions: q.ApplicationSource.GetAPIVersionsOrDefault(q.ApiVersions),
		})
		// ✅ plugin인 경우
	case v1alpha1.ApplicationSourceTypePlugin:
		// ✅ plugin 등록
		pluginName := ""
		if q.ApplicationSource.Plugin != nil {
			pluginName = q.ApplicationSource.Plugin.Name
		}
		// if pluginName is provided it has to be `<metadata.name>-<spec.version>` or just `<metadata.name>` if plugin version is empty
		// ✅ plugin sidecar로 실행하여 targetObjs 생성
		targetObjs, err = runConfigManagementPluginSidecars(ctx, appPath, repoRoot, pluginName, env, q, opt.cmpTarDoneCh, opt.cmpTarExcludedGlobs)
		if err != nil {
			err = fmt.Errorf("plugin sidecar failed. %s", err.Error())
		}
	case v1alpha1.ApplicationSourceTypeDirectory:
		// ✅ binary인 경우 findManifests로 targetObjs 생성
		var directory *v1alpha1.ApplicationSourceDirectory
		if directory = q.ApplicationSource.Directory; directory == nil {
			directory = &v1alpha1.ApplicationSourceDirectory{}
		}
		logCtx := log.WithField("application", q.AppName)
		targetObjs, err = findManifests(logCtx, appPath, repoRoot, env, *directory, q.EnabledSourceTypes, maxCombinedManifestQuantity)
	}
	
	// ...

	manifests := make([]string, 0)
	for _, obj := range targetObjs {
		// ✅ targetObjs를 순회하면서 리소스 타입을 json string으로 변경
	}

	return &apiclient.ManifestResponse{
		Manifests:  manifests,
		SourceType: string(appSourceType),
		Commands:   commands,
	}, nil
}

해당 함수 내부에서는 source 타입에 따라 적절한 방식으로 manifest를 생성합니다. GenerateManifests에서 manifest가 반환되면 promise의 responseCh에 결과가 등록됩니다. 최종적으로 promise에서 select로 대기하는 operation 클로저가 응답을 수신하고 grpc 요청이 종료됩니다.


참고자료

argocd-repo-server Command Reference - Argo CD - Declarative GitOps CD for Kubernetes

Leave a comment