From fb65327419eb938509cbc65cb3be05c51a1927d2 Mon Sep 17 00:00:00 2001 From: Arvind Choudhary Date: Wed, 25 Sep 2024 01:16:26 -0700 Subject: [PATCH] feat: [AH-231]: Upstream features (#3560) Added support for: 1. Default library/ prefix for docker hub if required. 2. Implemented manifest list support for upstream proxies. New table introduced: `oci_image_index_mappings` 3. Fixed Secret issue on UI for both gitness and harness code 4. Code refactoring to bring controller inside wire and other minor fixes around linting 5. Fixed few bugs around upstream proxy. --- .../0076_oci_image_index_mapping.down.sql | 1 + .../0076_oci_image_index_mapping.up.sql | 16 ++ .../0076_oci_image_index_mapping.down.sql | 1 + .../0076_oci_image_index_mapping.up.sql | 16 ++ cmd/gitness/wire_gen.go | 6 +- .../controller/metadata/create_registry.go | 1 + .../get_artifacts_docker_manifests.go | 57 ++-- .../controller/metadata/update_registry.go | 1 + registry/app/api/handler/oci/get_blob.go | 15 +- registry/app/api/handler/oci/get_manifest.go | 3 + registry/app/pkg/docker/local.go | 32 ++- registry/app/pkg/docker/manifest_service.go | 243 ++++++++++++++---- registry/app/pkg/docker/remote.go | 94 +++++-- registry/app/pkg/docker/wire.go | 36 ++- .../app/remote/controller/proxy/controller.go | 230 +++++++++++++---- registry/app/remote/controller/proxy/local.go | 1 + registry/app/store/database.go | 7 + .../store/database/oci_image_index_mapping.go | 179 +++++++++++++ registry/app/store/database/wire.go | 5 + registry/types/oci_Image_index_mapping.go | 31 +++ 20 files changed, 814 insertions(+), 161 deletions(-) create mode 100644 app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql create mode 100644 app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql create mode 100644 app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql create mode 100644 app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql create mode 100644 registry/app/store/database/oci_image_index_mapping.go create mode 100644 registry/types/oci_Image_index_mapping.go diff --git a/app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql new file mode 100644 index 000000000..5d82269d7 --- /dev/null +++ b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS oci_image_index_mappings; \ No newline at end of file diff --git a/app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql new file mode 100644 index 000000000..b4db8b53a --- /dev/null +++ b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS oci_image_index_mappings +( + oci_mapping_id SERIAL PRIMARY KEY, + oci_mapping_parent_manifest_id BIGINT NOT NULL, + oci_mapping_child_digest bytea NOT NULL, + oci_mapping_created_at BIGINT NOT NULL, + oci_mapping_updated_at BIGINT NOT NULL, + oci_mapping_created_by INTEGER NOT NULL, + oci_mapping_updated_by INTEGER NOT NULL, + CONSTRAINT unique_oci_mapping_digests + UNIQUE (oci_mapping_parent_manifest_id, oci_mapping_child_digest), + CONSTRAINT fk_oci_mapping_registry_id + FOREIGN KEY (oci_mapping_parent_manifest_id) + REFERENCES manifests(manifest_id) + ON DELETE CASCADE +) \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql new file mode 100644 index 000000000..5d82269d7 --- /dev/null +++ b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS oci_image_index_mappings; \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql new file mode 100644 index 000000000..f95ba9815 --- /dev/null +++ b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS oci_image_index_mappings +( + oci_mapping_id INTEGER PRIMARY KEY AUTOINCREMENT, + oci_mapping_parent_manifest_id BIGINT NOT NULL, + oci_mapping_child_digest bytea NOT NULL, + oci_mapping_created_at BIGINT NOT NULL, + oci_mapping_updated_at BIGINT NOT NULL, + oci_mapping_created_by INTEGER NOT NULL, + oci_mapping_updated_by INTEGER NOT NULL, + CONSTRAINT unique_oci_mapping_digests + UNIQUE (oci_mapping_parent_manifest_id, oci_mapping_child_digest), + CONSTRAINT fk_oci_mapping_registry_id + FOREIGN KEY (oci_mapping_parent_manifest_id) + REFERENCES manifests(manifest_id) + ON DELETE CASCADE +) \ No newline at end of file diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 10fcbb7d0..240824c23 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -442,14 +442,16 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro artifactRepository := database2.ProvideArtifactDao(db) layerRepository := database2.ProvideLayerDao(db, mediaTypesRepository) eventReporter := docker.ProvideReporter() - manifestService := docker.ManifestServiceProvider(registryRepository, manifestRepository, blobRepository, mediaTypesRepository, manifestReferenceRepository, tagRepository, imageRepository, artifactRepository, layerRepository, gcService, transactor, eventReporter, spacePathStore) + ociImageIndexMappingRepository := database2.ProvideOCIImageIndexMappingDao(db) + manifestService := docker.ManifestServiceProvider(registryRepository, manifestRepository, blobRepository, mediaTypesRepository, manifestReferenceRepository, tagRepository, imageRepository, artifactRepository, layerRepository, gcService, transactor, eventReporter, spacePathStore, ociImageIndexMappingRepository) registryBlobRepository := database2.ProvideRegistryBlobDao(db) bandwidthStatRepository := database2.ProvideBandwidthStatDao(db) downloadStatRepository := database2.ProvideDownloadStatDao(db) localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor) upstreamProxyConfigRepository := database2.ProvideUpstreamDao(db, registryRepository, spacePathStore) secretService := secret3.ProvideSecretService(secretStore, encrypter, spacePathStore) - remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService) + proxyController := docker.ProvideProxyController(localRegistry, manifestService, secretService, spacePathStore) + remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService, proxyController) coreController := pkg.CoreControllerProvider(registryRepository) dockerController := docker.ControllerProvider(localRegistry, remoteRegistry, coreController, spaceStore, authorizer) handler := api2.NewHandlerProvider(dockerController, spaceStore, tokenStore, controller, authenticator, provider, authorizer) diff --git a/registry/app/api/controller/metadata/create_registry.go b/registry/app/api/controller/metadata/create_registry.go index 254ff07b8..732044c3a 100644 --- a/registry/app/api/controller/metadata/create_registry.go +++ b/registry/app/api/controller/metadata/create_registry.go @@ -320,6 +320,7 @@ func (c *APIController) CreateUpstreamProxyEntity( } upstreamProxyConfigEntity.SecretSpaceID = *res.SecretSpaceId + upstreamProxyConfigEntity.SecretIdentifier = *res.SecretIdentifier } return repoEntity, upstreamProxyConfigEntity, nil } diff --git a/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go b/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go index 8854aacfa..bce5adf36 100644 --- a/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go +++ b/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go @@ -102,28 +102,9 @@ func (c *APIController) GetDockerArtifactManifests( } manifestDetailsList = append(manifestDetailsList, getManifestDetails(m, mConfig)) case *ml.DeserializedManifestList: - for _, manifestEntry := range reqManifest.Manifests { - dgst, err := types.NewDigest(manifestEntry.Digest) - if err != nil { - return artifactManifestsErrorRs(err), nil - } - referencedManifest, err := c.ManifestStore.FindManifestByDigest(ctx, registry.ID, image, dgst) - if err != nil { - if errors.Is(err, store2.ErrResourceNotFound) { - return artifactManifestsErrorRs( - fmt.Errorf("manifest not found"), - ), nil - } - return artifactManifestsErrorRs(err), nil - } - mConfig, err := getManifestConfig( - ctx, referencedManifest.Configuration.Digest, - regInfo.RootIdentifier, c.StorageDriver, - ) - if err != nil { - return artifactManifestsErrorRs(err), nil - } - manifestDetailsList = append(manifestDetailsList, getManifestDetails(referencedManifest, mConfig)) + manifestDetailsList, err = c.getManifestList(ctx, reqManifest, registry, image, regInfo) + if err != nil { + return artifactManifestsErrorRs(err), nil } default: log.Ctx(ctx).Error().Stack().Err(err).Msgf("Unknown manifest type: %T", manifest) @@ -141,6 +122,38 @@ func (c *APIController) GetDockerArtifactManifests( }, nil } +func (c *APIController) getManifestList( + ctx context.Context, reqManifest *ml.DeserializedManifestList, registry *types.Registry, image string, + regInfo *RegistryRequestBaseInfo, +) ([]artifact.DockerManifestDetails, error) { + manifestDetailsList := []artifact.DockerManifestDetails{} + for _, manifestEntry := range reqManifest.Manifests { + dgst, err := types.NewDigest(manifestEntry.Digest) + if err != nil { + return nil, err + } + referencedManifest, err := c.ManifestStore.FindManifestByDigest(ctx, registry.ID, image, dgst) + if err != nil { + if errors.Is(err, store2.ErrResourceNotFound) { + if registry.Type == artifact.RegistryTypeUPSTREAM { + continue + } + return nil, fmt.Errorf("manifest: %s not found", dgst.String()) + } + return nil, err + } + mConfig, err := getManifestConfig( + ctx, referencedManifest.Configuration.Digest, + regInfo.RootIdentifier, c.StorageDriver, + ) + if err != nil { + return nil, err + } + manifestDetailsList = append(manifestDetailsList, getManifestDetails(referencedManifest, mConfig)) + } + return manifestDetailsList, nil +} + func artifactManifestsErrorRs(err error) artifact.GetDockerArtifactManifestsResponseObject { return artifact.GetDockerArtifactManifests500JSONResponse{ InternalServerErrorJSONResponse: artifact.InternalServerErrorJSONResponse( diff --git a/registry/app/api/controller/metadata/update_registry.go b/registry/app/api/controller/metadata/update_registry.go index 7c08d04f5..fe917a94c 100644 --- a/registry/app/api/controller/metadata/update_registry.go +++ b/registry/app/api/controller/metadata/update_registry.go @@ -394,6 +394,7 @@ func (c *APIController) UpdateUpstreamProxyEntity( return nil, nil, err } upstreamProxyConfigEntity.SecretSpaceID = *res.SecretSpaceId + upstreamProxyConfigEntity.SecretIdentifier = *res.SecretIdentifier } else { upstreamProxyConfigEntity.UserName = "" upstreamProxyConfigEntity.SecretIdentifier = "" diff --git a/registry/app/api/handler/oci/get_blob.go b/registry/app/api/handler/oci/get_blob.go index 37f734bf8..e55c2086c 100644 --- a/registry/app/api/handler/oci/get_blob.go +++ b/registry/app/api/handler/oci/get_blob.go @@ -45,10 +45,16 @@ func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) { } defer func() { if response.Body != nil { - response.Body.Close() + err := response.Body.Close() + if err != nil { + log.Ctx(ctx).Error().Msgf("Failed to close body: %v", err) + } } if response.ReadCloser != nil { - response.ReadCloser.Close() + err := response.ReadCloser.Close() + if err != nil { + log.Ctx(ctx).Error().Msgf("Failed to close readCloser: %v", err) + } } }() @@ -57,6 +63,11 @@ func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, response.RedirectURL, http.StatusTemporaryRedirect) return } + + if response.ResponseHeaders != nil && response.ResponseHeaders.Code == http.StatusMovedPermanently { + response.ResponseHeaders.WriteToResponse(w) + return + } response.ResponseHeaders.WriteHeadersToResponse(w) if r.Method == http.MethodHead { return diff --git a/registry/app/api/handler/oci/get_manifest.go b/registry/app/api/handler/oci/get_manifest.go index cf65e8c9a..be7f5f430 100644 --- a/registry/app/api/handler/oci/get_manifest.go +++ b/registry/app/api/handler/oci/get_manifest.go @@ -46,6 +46,9 @@ func (h *Handler) GetManifest(w http.ResponseWriter, r *http.Request) { return } response.ResponseHeaders.WriteToResponse(w) + if response.ResponseHeaders.Code == http.StatusMovedPermanently { + return + } _, bytes, _ := response.Manifest.Payload() if _, err := w.Write(bytes); err != nil { log.Ctx(ctx).Error().Err(err).Msg("Failed to write response") diff --git a/registry/app/pkg/docker/local.go b/registry/app/pkg/docker/local.go index 8183e7f9a..af67dd4cf 100644 --- a/registry/app/pkg/docker/local.go +++ b/registry/app/pkg/docker/local.go @@ -404,16 +404,20 @@ func (r *LocalRegistry) fetchBlobInternal( } if http.MethodGet == method { + // This GoRoutine is used to update the bandwidth stat of the artifact go func(art pkg.RegistryInfo, dgst digest.Digest) { // Cloning Context. session, _ := request.AuthSessionFrom(ctx) ctx3 := request.WithAuthSession(context.Background(), session) err := r.dbBlobDownloadComplete(ctx3, dgst, info) if err != nil { - log.Error().Stack().Err(err).Msgf("error while putting bandwidth stat of artifact, %v", err) + log.Ctx(ctx3).Error().Stack().Str("goRoutine", + "UpdateBandwidth").Err(err).Msgf("error while putting bandwidth stat of artifact, %v", + err) return } - log.Info().Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest) + log.Ctx(ctx3).Info().Str("goRoutine", + "UpdateBandwidth").Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest) }(info, dgst) } @@ -435,16 +439,23 @@ func (r *LocalRegistry) PullManifest( ifNoneMatchHeader []string, ) (responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifest manifest.Manifest, errs []error) { responseHeaders, descriptor, manifest, errs = r.ManifestExist(ctx, artInfo, acceptHeaders, ifNoneMatchHeader) + + // This GoRoutine is used to update the download stat of the artifact when manifest is pulled go func(art pkg.RegistryInfo) { // Cloning Context. session, _ := request.AuthSessionFrom(ctx) ctx2 := request.WithAuthSession(context.Background(), session) + ctx2 = log.Ctx(ctx2).With(). + Str("goRoutine", "UpdateDownload"). + Logger().WithContext(ctx2) err := r.dbGetManifestComplete(ctx2, artInfo) if err != nil { - log.Error().Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err) + log.Ctx(ctx2).Error().Str("goRoutine", + "UpdateDownload").Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err) return } - log.Info().Msgf("Successfully updated the download stat metrics %s", art.Digest) + log.Ctx(ctx2).Info().Str("goRoutine", + "UpdateDownload").Msgf("Successfully updated the download stat metrics %s", art.Digest) }(artInfo) return responseHeaders, descriptor, manifest, errs } @@ -821,7 +832,7 @@ func (r *LocalRegistry) PutManifest( responseHeaders.Headers["Docker-Content-Digest"] = d.String() responseHeaders.Code = http.StatusCreated - log.Debug().Msg("Succeeded in putting manifest!") + log.Ctx(ctx).Debug().Msgf("Succeeded in putting manifest: %s", d.String()) return responseHeaders, errs } @@ -1651,6 +1662,11 @@ func (r *LocalRegistry) dbGetManifestComplete( ctx context.Context, info pkg.RegistryInfo, ) error { + // FIXME: Update logic incase requests are internal. Currently, we are updating the stats for all requests. + if info.Digest == "" { + return nil + } + err := r.tx.WithTx( ctx, func(ctx context.Context) error { registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier) @@ -1663,7 +1679,11 @@ func (r *LocalRegistry) dbGetManifestComplete( return err } - artifact, err := r.artifactDao.GetByName(ctx, image.ID, info.Digest) + newDigest, err := types.NewDigest(digest.Digest(info.Digest)) + if err != nil { + log.Ctx(ctx).Error().Stack().Err(err).Msgf("error parsing digest: %s %v", info.Digest, err) + } + artifact, err := r.artifactDao.GetByName(ctx, image.ID, newDigest.String()) if err != nil { return err } diff --git a/registry/app/pkg/docker/manifest_service.go b/registry/app/pkg/docker/manifest_service.go index 48fb6beed..33dfaa6dc 100644 --- a/registry/app/pkg/docker/manifest_service.go +++ b/registry/app/pkg/docker/manifest_service.go @@ -23,7 +23,8 @@ import ( "fmt" "time" - gitnessappstore "github.com/harness/gitness/app/store" + gas "github.com/harness/gitness/app/store" + "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" "github.com/harness/gitness/registry/app/event" "github.com/harness/gitness/registry/app/manifest" "github.com/harness/gitness/registry/app/manifest/manifestlist" @@ -47,19 +48,20 @@ import ( ) type manifestService struct { - registryDao store.RegistryRepository - manifestDao store.ManifestRepository - layerDao store.LayerRepository - blobRepo store.BlobRepository - mtRepository store.MediaTypesRepository - tagDao store.TagRepository - imageDao store.ImageRepository - artifactDao store.ArtifactRepository - manifestRefDao store.ManifestReferenceRepository - spacePathStore gitnessappstore.SpacePathStore - gcService gc.Service - tx dbtx.Transactor - reporter event.Reporter + registryDao store.RegistryRepository + manifestDao store.ManifestRepository + layerDao store.LayerRepository + blobRepo store.BlobRepository + mtRepository store.MediaTypesRepository + tagDao store.TagRepository + imageDao store.ImageRepository + artifactDao store.ArtifactRepository + manifestRefDao store.ManifestReferenceRepository + ociImageIndexMappingDao store.OCIImageIndexMappingRepository + spacePathStore gas.SpacePathStore + gcService gc.Service + tx dbtx.Transactor + reporter event.Reporter } func NewManifestService( @@ -67,22 +69,24 @@ func NewManifestService( blobRepo store.BlobRepository, mtRepository store.MediaTypesRepository, tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository, layerDao store.LayerRepository, manifestRefDao store.ManifestReferenceRepository, - tx dbtx.Transactor, gcService gc.Service, reporter event.Reporter, spacePathStore gitnessappstore.SpacePathStore, + tx dbtx.Transactor, gcService gc.Service, reporter event.Reporter, spacePathStore gas.SpacePathStore, + ociImageIndexMappingDao store.OCIImageIndexMappingRepository, ) ManifestService { return &manifestService{ - registryDao: registryDao, - manifestDao: manifestDao, - layerDao: layerDao, - blobRepo: blobRepo, - mtRepository: mtRepository, - tagDao: tagDao, - artifactDao: artifactDao, - imageDao: imageDao, - manifestRefDao: manifestRefDao, - gcService: gcService, - tx: tx, - reporter: reporter, - spacePathStore: spacePathStore, + registryDao: registryDao, + manifestDao: manifestDao, + layerDao: layerDao, + blobRepo: blobRepo, + mtRepository: mtRepository, + tagDao: tagDao, + artifactDao: artifactDao, + imageDao: imageDao, + manifestRefDao: manifestRefDao, + gcService: gcService, + tx: tx, + reporter: reporter, + spacePathStore: spacePathStore, + ociImageIndexMappingDao: ociImageIndexMappingDao, } } @@ -107,6 +111,7 @@ type ManifestService interface { ) error DeleteTag(ctx context.Context, repoKey string, tag string, info pkg.RegistryInfo) (bool, error) DeleteManifest(ctx context.Context, repoKey string, d digest.Digest, info pkg.RegistryInfo) error + AddManifestAssociation(ctx context.Context, repoKey string, digest digest.Digest, info pkg.RegistryInfo) error DBFindRepositoryBlob( ctx context.Context, desc manifest.Descriptor, repoID int64, info pkg.RegistryInfo, @@ -204,7 +209,8 @@ func (l *manifestService) dbTagManifest( } // Create or update artifact and tag records - if err := l.createOrUpdateArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName, dgst); err != nil { + if err := l.upsertArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName, + dgst); err != nil { return formatFailedToTagErr(err) } @@ -246,7 +252,7 @@ func (l *manifestService) lockManifestForGC(ctx context.Context, repoID, manifes } // Creates or updates artifact and tag records. -func (l *manifestService) createOrUpdateArtifactAndTag( +func (l *manifestService) upsertArtifactAndTag( ctx context.Context, registryID, manifestID int64, @@ -418,7 +424,7 @@ func (l *manifestService) dbPutManifestV2( return nil } - log.Debug().Msgf("manifest not found in database") + log.Debug().Msgf("manifest %s not found in database", dgst.String()) cfg := &types.Configuration{ MediaType: mfst.Config().MediaType, @@ -524,6 +530,56 @@ func (l *manifestService) DBFindRepositoryBlob( return b, nil } +// AddManifestAssociation This updates the manifestRefs for all new childDigests to their already existing parent +// manifests. This is used when a manifest from a manifest list is pulled from the remote and manifest list already +// exists in the database. +func (l *manifestService) AddManifestAssociation( + ctx context.Context, repoKey string, childDigest digest.Digest, info pkg.RegistryInfo, +) error { + newDigest, err2 := types.NewDigest(childDigest) + if err2 != nil { + return fmt.Errorf("failed to create digest: %s %w", childDigest, err2) + } + r, err := l.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoKey) + if err != nil { + return fmt.Errorf("failed to get registry: %s %w", repoKey, err) + } + childManifest, err2 := l.manifestDao.FindManifestByDigest(ctx, r.ID, info.Image, newDigest) + if err2 != nil { + return fmt.Errorf("failed to find manifest by digest. Repo: %d Image: %s %w", r.ID, info.Image, err2) + } + mappings, err := l.ociImageIndexMappingDao.GetAllByChildDigest(ctx, r.ID, childManifest.ImageName, newDigest) + if err != nil { + return fmt.Errorf("failed to get oci image index mappings. Repo: %d Image: %s %w", + r.ID, + childManifest.ImageName, + err) + } + for _, mapping := range mappings { + parentManifest, err := l.manifestDao.Get(ctx, mapping.ParentManifestID) + if err != nil { + return fmt.Errorf("failed to get manifest with ID: %d %w", mapping.ParentManifestID, err) + } + if err := l.manifestRefDao.AssociateManifest(ctx, parentManifest, childManifest); err != nil { + if errors.Is(err, util.ErrRefManifestNotFound) { + // This can only happen if the online GC deleted one + // of the referenced manifests (because they were + // untagged/unreferenced) between the call to + // `FindAndLockNBefore` and `AssociateManifest`. For now + // we need to return this error to mimic the behaviour + // of the corresponding filesystem validation. + log.Error(). + Msgf("Failed to associate manifest Ref Manifest not found. parentDigest:%s childDigest:%s %v", + parentManifest.Digest.String(), + childManifest.Digest.String(), + err) + return err + } + } + } + return nil +} + func (l *manifestService) handleSubject( ctx context.Context, subject manifest.Descriptor, artifactType string, annotations map[string]string, dbRepo *types.Registry, @@ -620,15 +676,9 @@ func (l *manifestService) dbPutManifestList( ImageName: info.Image, } - mm := make([]*types.Manifest, 0, len(manifestList.Manifests)) - ids := make([]int64, 0, len(mm)) - for _, desc := range manifestList.Manifests { - m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) - if err != nil { - return err - } - mm = append(mm, m) - ids = append(ids, m.ID) + mm, ids, err2 := l.validateManifestList(ctx, manifestList, r, info) + if err2 != nil { + return err2 } err = l.tx.WithTx( @@ -675,14 +725,102 @@ func (l *manifestService) dbPutManifestList( return err } } + + err = l.mapManifestList(ctx, ml.ID, manifestList, r) + if err != nil { + return fmt.Errorf("failed to map manifest list: %w", err) + } + return nil }, ) if err != nil { - log.Ctx(ctx).Error().Err(err).Msgf("failed to create manifest list in database") + log.Ctx(ctx).Error().Err(err).Msgf("failed to create manifest list in database: %v", err) + return fmt.Errorf("failed to create manifest list in database: %w", err) } - return err + return nil +} + +func (l *manifestService) validateManifestIndex( + ctx context.Context, manifestList *ocischema.DeserializedImageIndex, r *types.Registry, info pkg.RegistryInfo, +) ([]*types.Manifest, []int64, error) { + mm := make([]*types.Manifest, 0, len(manifestList.Manifests)) + ids := make([]int64, 0, len(manifestList.Manifests)) + for _, desc := range manifestList.Manifests { + m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) + if errors.Is(err, gitnessstore.ErrResourceNotFound) && r.Type == artifact.RegistryTypeUPSTREAM { + continue + } + if err != nil { + return nil, nil, err + } + mm = append(mm, m) + ids = append(ids, m.ID) + } + log.Ctx(ctx).Debug().Msgf("validated %d / %d manifests in index", len(mm), len(manifestList.Manifests)) + return mm, ids, nil +} + +func (l *manifestService) mapManifestIndex( + ctx context.Context, mi int64, manifestList *ocischema.DeserializedImageIndex, r *types.Registry, +) error { + if r.Type != artifact.RegistryTypeUPSTREAM { + return nil + } + for _, desc := range manifestList.Manifests { + err := l.ociImageIndexMappingDao.Create(ctx, &types.OCIImageIndexMapping{ + ParentManifestID: mi, + ChildManifestDigest: desc.Digest, + }) + if err != nil { + log.Ctx(ctx).Error().Err(err). + Msgf("failed to create oci image index manifest for digest %s", desc.Digest) + return fmt.Errorf("failed to create oci image index manifest: %w", err) + } + } + log.Ctx(ctx).Debug().Msgf("successfully mapped manifest index %d with its manifests", mi) + return nil +} + +func (l *manifestService) validateManifestList( + ctx context.Context, manifestList *manifestlist.DeserializedManifestList, r *types.Registry, info pkg.RegistryInfo, +) ([]*types.Manifest, []int64, error) { + mm := make([]*types.Manifest, 0, len(manifestList.Manifests)) + ids := make([]int64, 0, len(manifestList.Manifests)) + for _, desc := range manifestList.Manifests { + m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) + if errors.Is(err, gitnessstore.ErrResourceNotFound) && r.Type == artifact.RegistryTypeUPSTREAM { + continue + } + if err != nil { + return nil, nil, err + } + mm = append(mm, m) + ids = append(ids, m.ID) + } + log.Ctx(ctx).Debug().Msgf("validated %d / %d manifests in list", len(mm), len(manifestList.Manifests)) + return mm, ids, nil +} + +func (l *manifestService) mapManifestList( + ctx context.Context, mi int64, manifestList *manifestlist.DeserializedManifestList, r *types.Registry, +) error { + if r.Type != artifact.RegistryTypeUPSTREAM { + return nil + } + for _, desc := range manifestList.Manifests { + err := l.ociImageIndexMappingDao.Create(ctx, &types.OCIImageIndexMapping{ + ParentManifestID: mi, + ChildManifestDigest: desc.Digest, + }) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msgf("failed to create oci image index manifest for digest %s", desc.Digest) + return fmt.Errorf("failed to create oci image index manifest: %w", err) + } + } + log.Ctx(ctx).Debug().Msgf("successfully mapped manifest list %d with its manifests", mi) + return nil } func (l *manifestService) dbPutImageIndex( @@ -738,15 +876,9 @@ func (l *manifestService) dbPutImageIndex( return subjectHandlingError } - mm := make([]*types.Manifest, 0, len(imageIndex.Manifests)) - ids := make([]int64, 0, len(mm)) - for _, desc := range imageIndex.Manifests { - m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) - if err != nil { - return err - } - mm = append(mm, m) - ids = append(ids, m.ID) + mm, ids, err := l.validateManifestIndex(ctx, imageIndex, r, info) + if err != nil { + return fmt.Errorf("failed to map manifest index: %w", err) } err = l.tx.WithTx( @@ -792,6 +924,11 @@ func (l *manifestService) dbPutImageIndex( return err } } + + err = l.mapManifestIndex(ctx, mi.ID, imageIndex, r) + if err != nil { + return fmt.Errorf("failed to map manifest index: %w", err) + } return nil }, ) @@ -843,8 +980,8 @@ func (l *manifestService) dbFindManifestListManifest( if err != nil { if errors.Is(err, gitnessstore.ErrResourceNotFound) { return nil, fmt.Errorf( - "manifest %s not found for %s/%s", digest.String(), - repository.Name, imageName, + "manifest %s not found for %s/%s: %w", digest.String(), + repository.Name, imageName, err, ) } return nil, err diff --git a/registry/app/pkg/docker/remote.go b/registry/app/pkg/docker/remote.go index b7e37c544..21fb34489 100644 --- a/registry/app/pkg/docker/remote.go +++ b/registry/app/pkg/docker/remote.go @@ -21,12 +21,16 @@ import ( "fmt" "io" "net/http" + "strings" "time" "github.com/harness/gitness/app/api/request" store2 "github.com/harness/gitness/app/store" + "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" "github.com/harness/gitness/registry/app/common/lib/errors" "github.com/harness/gitness/registry/app/manifest" + "github.com/harness/gitness/registry/app/manifest/manifestlist" + "github.com/harness/gitness/registry/app/manifest/schema2" "github.com/harness/gitness/registry/app/pkg" "github.com/harness/gitness/registry/app/pkg/commons" proxy2 "github.com/harness/gitness/registry/app/remote/controller/proxy" @@ -49,14 +53,26 @@ const ( func NewRemoteRegistry( local *LocalRegistry, app *App, upstreamProxyConfigRepo store.UpstreamProxyConfigRepository, - spacePathStore store2.SpacePathStore, secretService secret.Service, + spacePathStore store2.SpacePathStore, secretService secret.Service, proxyCtl proxy2.Controller, ) Registry { + cache := proxy2.GetManifestCache(local, local.ms) + listCache := proxy2.GetManifestListCache(local) + + registry := map[string]proxy2.ManifestCacheHandler{ + manifestlist.MediaTypeManifestList: listCache, + v1.MediaTypeImageIndex: listCache, + schema2.MediaTypeManifest: cache, + proxy2.DefaultHandler: cache, + } + return &RemoteRegistry{ local: local, App: app, upstreamProxyConfigRepo: upstreamProxyConfigRepo, spacePathStore: spacePathStore, secretService: secretService, + manifestCacheHandlerMap: registry, + proxyCtl: proxyCtl, } } @@ -70,21 +86,42 @@ type RemoteRegistry struct { upstreamProxyConfigRepo store.UpstreamProxyConfigRepository spacePathStore store2.SpacePathStore secretService secret.Service + proxyCtl proxy2.Controller + manifestCacheHandlerMap map[string]proxy2.ManifestCacheHandler } func (r *RemoteRegistry) Base() error { panic("Not implemented yet, will be done during Replication flows") } -func defaultLibrary() (bool, string, error) { - // get upstream Repository and check if the path contains library prefix. If yes, redirect to the correct path without - // library prefix. - return false, "", nil +// defaultLibrary checks if we need to append "library/" to dockerhub images. For example, if the image is +// "alpine" then we need to append "library/alpine" to the image. +func (r *RemoteRegistry) defaultLibrary(ctx context.Context, artInfo pkg.RegistryInfo) (bool, error) { + upstreamProxy, err := r.upstreamProxyConfigRepo.GetByRegistryIdentifier( + ctx, artInfo.ParentID, artInfo.RegIdentifier, + ) + if err != nil { + return false, err + } + if upstreamProxy.Source != string(artifact.UpstreamConfigSourceDockerhub) { + log.Ctx(ctx).Debug().Msg("upstream proxy source is not Dockerhub") + return false, nil + } + if strings.Contains(artInfo.Image, "/") { + log.Ctx(ctx).Debug().Msgf("image name %s contains a slash", artInfo.Image) + return false, nil + } + return true, nil } // defaultManifestURL return the real url for request with default project. -func defaultManifestURL(regIdentifier string, name string, a pkg.RegistryInfo) string { - return fmt.Sprintf("/v2/%s/library/%s/manifests/%s", regIdentifier, name, a.Reference) +func defaultManifestURL(rootIdentifier string, regIdentifier string, name string, a pkg.RegistryInfo) string { + return fmt.Sprintf("/v2/%s/%s/library/%s/manifests/%s", rootIdentifier, regIdentifier, name, a.Reference) +} + +// defaultBlobURL return the real url for request with default project. +func defaultBlobURL(rootIdentifier string, regIdentifier string, name string, digest string) string { + return fmt.Sprintf("/v2/%s/%s/library/%s/blobs/%s", rootIdentifier, regIdentifier, name, digest) } func proxyManifestHead( @@ -106,6 +143,7 @@ func proxyManifestHead( return errors.NotFoundError(fmt.Errorf("the tag %v:%v is not found", art.Image, art.Tag)) } + // This goRoutine is to update the tag of recently pulled manifest if required if len(art.Tag) > 0 { go func(art pkg.RegistryInfo) { // Write function to update local storage. @@ -119,12 +157,17 @@ func proxyManifestHead( for i := 0; i < ensureTagMaxRetry; i++ { time.Sleep(ensureTagInterval) count++ - log.Ctx(ctx).Info().Msgf("Ensure tag: %s for image: %s, retry: %d", tag, info.Image, count) + log.Ctx(ctx2).Info().Str("goRoutine", "EnsureTag").Msgf("Tag %s for image: %s, retry: %d", tag, + info.Image, + count) e := ctl.EnsureTag(ctx2, responseHeaders, art, acceptHeaders, ifNoneMatchHeader) if e != nil { - log.Ctx(ctx).Warn().Err(e).Msgf("Failed to update tag: ") + log.Ctx(ctx2).Warn().Str("goRoutine", + "EnsureTag").Err(e).Msgf("Failed to update tag: %s for image: %s", + tag, info.Image) } else { - log.Ctx(ctx).Info().Msgf("Tag updated: %s for image: %s", tag, info.Image) + log.Ctx(ctx2).Info().Str("goRoutine", "EnsureTag").Msgf("Tag updated: %s for image: %s", tag, + info.Image) return } } @@ -147,20 +190,19 @@ func (r *RemoteRegistry) ManifestExist( responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifestResult manifest.Manifest, errs []error, ) { - proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore) responseHeaders = &commons.ResponseHeaders{ Headers: make(map[string]string), } - defaultProj, name, err := defaultLibrary() + isDefault, err := r.defaultLibrary(ctx, artInfo) if err != nil { errs = append(errs, err) return responseHeaders, descriptor, manifestResult, errs } registryInfo := artInfo - if defaultProj { + if isDefault { responseHeaders.Code = http.StatusMovedPermanently responseHeaders.Headers = map[string]string{ - "Location": defaultManifestURL(artInfo.RegIdentifier, name, registryInfo), + "Location": defaultManifestURL(artInfo.RootIdentifier, artInfo.RegIdentifier, artInfo.Image, registryInfo), } return responseHeaders, descriptor, manifestResult, errs } @@ -183,7 +225,7 @@ func (r *RemoteRegistry) ManifestExist( errs = append(errs, errors.New("Proxy is down")) return responseHeaders, descriptor, manifestResult, errs } - useLocal, man, err := proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) + useLocal, man, err := r.proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) if err != nil { errs = append(errs, err) @@ -210,7 +252,7 @@ func (r *RemoteRegistry) ManifestExist( err = proxyManifestHead( ctx, responseHeaders, - proxyCtl, + r.proxyCtl, registryInfo, remoteHelper, artInfo, @@ -237,20 +279,19 @@ func (r *RemoteRegistry) PullManifest( responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifestResult manifest.Manifest, errs []error, ) { - proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore) responseHeaders = &commons.ResponseHeaders{ Headers: make(map[string]string), } - defaultProj, name, err := defaultLibrary() + isDefault, err := r.defaultLibrary(ctx, artInfo) if err != nil { errs = append(errs, err) return responseHeaders, descriptor, manifestResult, errs } registryInfo := artInfo - if defaultProj { + if isDefault { responseHeaders.Code = http.StatusMovedPermanently responseHeaders.Headers = map[string]string{ - "Location": defaultManifestURL(artInfo.RegIdentifier, name, registryInfo), + "Location": defaultManifestURL(artInfo.RootIdentifier, artInfo.RegIdentifier, artInfo.Image, registryInfo), } return responseHeaders, descriptor, manifestResult, errs } @@ -272,7 +313,7 @@ func (r *RemoteRegistry) PullManifest( errs = append(errs, errors.New("Proxy is down")) return responseHeaders, descriptor, manifestResult, errs } - useLocal, man, err := proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) + useLocal, man, err := r.proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) if err != nil { errs = append(errs, err) @@ -304,7 +345,7 @@ func (r *RemoteRegistry) PullManifest( manifestResult, err = proxyManifestGet( ctx, responseHeaders, - proxyCtl, + r.proxyCtl, registryInfo, remoteHelper, artInfo.RegIdentifier, @@ -352,7 +393,6 @@ func (r *RemoteRegistry) fetchBlobInternal( responseHeaders *commons.ResponseHeaders, fr *storage.FileReader, size int64, readCloser io.ReadCloser, redirectURL string, errs []error, ) { - proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore) responseHeaders = &commons.ResponseHeaders{ Headers: make(map[string]string), } @@ -360,7 +400,7 @@ func (r *RemoteRegistry) fetchBlobInternal( log.Ctx(ctx).Info().Msgf("Proxy: %s", repoKey) // Handle dockerhub request without library prefix. - isDefault, name, err := defaultLibrary() + isDefault, err := r.defaultLibrary(ctx, info) if err != nil { errs = append(errs, err) return responseHeaders, fr, size, readCloser, redirectURL, errs @@ -369,7 +409,7 @@ func (r *RemoteRegistry) fetchBlobInternal( if isDefault { responseHeaders.Code = http.StatusMovedPermanently responseHeaders.Headers = map[string]string{ - "Location": defaultManifestURL(repoKey, name, registryInfo), + "Location": defaultBlobURL(info.RootIdentifier, repoKey, info.Image, info.Digest), } return responseHeaders, fr, size, readCloser, redirectURL, errs } @@ -378,7 +418,7 @@ func (r *RemoteRegistry) fetchBlobInternal( errs = append(errs, errors.New("Blob not found")) } - if proxyCtl.UseLocalBlob(ctx, registryInfo) { + if r.proxyCtl.UseLocalBlob(ctx, registryInfo) { switch method { case http.MethodGet: headers, reader, s, closer, url, e := r.local.GetBlob(ctx, info) @@ -398,7 +438,7 @@ func (r *RemoteRegistry) fetchBlobInternal( } // This is start of proxy Code. - size, readCloser, err = proxyCtl.ProxyBlob(ctx, registryInfo, repoKey, *upstreamProxy) + size, readCloser, err = r.proxyCtl.ProxyBlob(ctx, registryInfo, repoKey, *upstreamProxy) if err != nil { errs = append(errs, err) return responseHeaders, fr, size, readCloser, redirectURL, errs diff --git a/registry/app/pkg/docker/wire.go b/registry/app/pkg/docker/wire.go index 96ebdd8a8..8a24cc016 100644 --- a/registry/app/pkg/docker/wire.go +++ b/registry/app/pkg/docker/wire.go @@ -19,7 +19,10 @@ import ( gitnessstore "github.com/harness/gitness/app/store" storagedriver "github.com/harness/gitness/registry/app/driver" "github.com/harness/gitness/registry/app/event" + "github.com/harness/gitness/registry/app/manifest/manifestlist" + "github.com/harness/gitness/registry/app/manifest/schema2" "github.com/harness/gitness/registry/app/pkg" + proxy2 "github.com/harness/gitness/registry/app/remote/controller/proxy" "github.com/harness/gitness/registry/app/storage" "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/registry/gc" @@ -28,6 +31,7 @@ import ( "github.com/harness/gitness/types" "github.com/google/wire" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) func LocalRegistryProvider( @@ -51,18 +55,21 @@ func ManifestServiceProvider( manifestRefDao store.ManifestReferenceRepository, tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository, layerDao store.LayerRepository, gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter, spacePathStore gitnessstore.SpacePathStore, + ociImageIndexMappingDao store.OCIImageIndexMappingRepository, ) ManifestService { return NewManifestService( registryDao, manifestDao, blobRepo, mtRepository, tagDao, imageDao, artifactDao, layerDao, manifestRefDao, tx, gcService, reporter, spacePathStore, + ociImageIndexMappingDao, ) } func RemoteRegistryProvider( local *LocalRegistry, app *App, upstreamProxyConfigRepo store.UpstreamProxyConfigRepository, - spacePathStore gitnessstore.SpacePathStore, secretService secret.Service, + spacePathStore gitnessstore.SpacePathStore, secretService secret.Service, proxyCtrl proxy2.Controller, ) *RemoteRegistry { - return NewRemoteRegistry(local, app, upstreamProxyConfigRepo, spacePathStore, secretService).(*RemoteRegistry) + return NewRemoteRegistry(local, app, upstreamProxyConfigRepo, spacePathStore, secretService, + proxyCtrl).(*RemoteRegistry) } func ControllerProvider( @@ -83,8 +90,31 @@ func ProvideReporter() event.Reporter { return &event.Noop{} } +func ProvideProxyController( + registry *LocalRegistry, ms ManifestService, secretService secret.Service, + spacePathStore gitnessstore.SpacePathStore, +) proxy2.Controller { + manifestCacheHandler := getManifestCacheHandler(registry, ms) + return proxy2.NewProxyController(registry, ms, secretService, spacePathStore, manifestCacheHandler) +} + +func getManifestCacheHandler( + registry *LocalRegistry, ms ManifestService, +) map[string]proxy2.ManifestCacheHandler { + cache := proxy2.GetManifestCache(registry, ms) + listCache := proxy2.GetManifestListCache(registry) + + return map[string]proxy2.ManifestCacheHandler{ + manifestlist.MediaTypeManifestList: listCache, + v1.MediaTypeImageIndex: listCache, + schema2.MediaTypeManifest: cache, + proxy2.DefaultHandler: cache, + } +} + var ControllerSet = wire.NewSet(ControllerProvider) var RegistrySet = wire.NewSet(LocalRegistryProvider, ManifestServiceProvider, RemoteRegistryProvider) +var ProxySet = wire.NewSet(ProvideProxyController) var StorageServiceSet = wire.NewSet(StorageServiceProvider) var AppSet = wire.NewSet(NewApp) -var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet) +var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet, ProxySet) diff --git a/registry/app/remote/controller/proxy/controller.go b/registry/app/remote/controller/proxy/controller.go index c5dc4aa2e..54e53f60b 100644 --- a/registry/app/remote/controller/proxy/controller.go +++ b/registry/app/remote/controller/proxy/controller.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "net/url" - "sync" "time" "github.com/harness/gitness/app/api/request" @@ -41,18 +40,13 @@ import ( const ( // wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready. - maxManifestListWait = 20 - maxManifestWait = 10 - sleepIntervalSec = 20 + maxManifestWait = 10 + maxManifestMappingWait = 10 + maxManifestMappingIntervalSec = 10 + sleepIntervalSec = 20 // keep manifest list in cache for one week. ) -var ( - // Ctl is a global proxy controller instance. - ctl Controller - once sync.Once -) - // Controller defines the operations related with pull through proxy. type Controller interface { // UseLocalBlob check if the blob should use localRegistry copy. @@ -94,29 +88,25 @@ type Controller interface { } type controller struct { - localRegistry registryInterface - localManifestRegistry registryManifestInterface - secretService secret.Service - spacePathStore store.SpacePathStore + localRegistry registryInterface + localManifestRegistry registryManifestInterface + secretService secret.Service + spacePathStore store.SpacePathStore + manifestCacheHandlerMap map[string]ManifestCacheHandler } -// ControllerInstance -- get the proxy controller instance. -func ControllerInstance( +// NewProxyController -- get the proxy controller instance. +func NewProxyController( l registryInterface, lm registryManifestInterface, secretService secret.Service, - spacePathStore store.SpacePathStore, + spacePathStore store.SpacePathStore, manifestCacheHandlerMap map[string]ManifestCacheHandler, ) Controller { - once.Do( - func() { - ctl = &controller{ - localRegistry: l, - localManifestRegistry: lm, - secretService: secretService, - spacePathStore: spacePathStore, - } - }, - ) - - return ctl + return &controller{ + localRegistry: l, + localManifestRegistry: lm, + secretService: secretService, + spacePathStore: spacePathStore, + manifestCacheHandlerMap: manifestCacheHandlerMap, + } } func (c *controller) EnsureTag( @@ -133,7 +123,7 @@ func (c *controller) EnsureTag( return err[0] } - //Fixme: Need to properly pick tag. + // Fixme: Need to properly pick tag. e := c.localManifestRegistry.DBTag(ctx, mfst, desc.Digest, info.Reference, info.RegIdentifier, rsHeaders, info) if e != nil { log.Error().Err(e).Msgf("Error in ensuring tag: %s", e) @@ -147,7 +137,7 @@ func (c *controller) UseLocalBlob(ctx context.Context, art pkg.RegistryInfo) boo } // TODO: Get from Local storage. _, _, _, _, _, e := c.localRegistry.GetBlob(ctx, art) - return e == nil + return len(e) == 0 } // ManifestList ... @@ -177,16 +167,18 @@ func (c *controller) UseLocalManifest( remoteRepo := getRemoteRepo(art) exist, desc, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD. - log.Info().Msgf("Manifest exist: %t %s %d %s", exist, desc.Digest.String(), desc.Size, desc.MediaType) // TODO: Check for rate limit error. if err != nil { if errors.IsRateLimitError(err) { // if rate limit, use localRegistry if it exists, otherwise return error. + log.Ctx(ctx).Warn().Msgf("Rate limit error: %v", err) return true, nil, nil } + log.Ctx(ctx).Warn().Msgf("Error in checking remote manifest exist: %v", err) return false, nil, err } + log.Info().Msgf("Manifest exist: %t %s %d %s", exist, desc.Digest.String(), desc.Size, desc.MediaType) - // TODO: Delete if does not exist on remote. + // TODO: Delete if does not exist on remote. Validate this if !exist || desc == nil { go func() { c.localRegistry.DeleteManifest(ctx, art) @@ -194,7 +186,7 @@ func (c *controller) UseLocalManifest( return false, nil, errors.NotFoundError(fmt.Errorf("registry %v, tag %v not found", art.RegIdentifier, art.Tag)) } - log.Info().Msgf("Manifest: %s %s", man, getReference(art)) + log.Info().Msgf("Manifest: %s", getReference(art)) mediaType, payload, _ := man.Payload() return true, &ManifestList{payload, d.Digest.String(), mediaType}, nil @@ -228,13 +220,13 @@ func (c *controller) ProxyManifest( } return man, err } - ct, payload, err := man.Payload() + ct, _, err := man.Payload() log.Info().Msgf("Content type: %s", ct) if err != nil { return man, err } - // Push manifest in background. + // This GoRoutine is to push the manifest from Remote to Local registry. go func(_, ct string) { session, _ := request.AuthSessionFrom(ctx) ctx2 := request.WithAuthSession(context.Background(), session) @@ -242,10 +234,13 @@ func (c *controller) ProxyManifest( for n := 0; n < maxManifestWait; n++ { time.Sleep(sleepIntervalSec * time.Second) count++ - log.Info().Msgf("Current retry=%v artifact: %v:%v", count, repoKey, imageName) + log.Ctx(ctx2).Info().Str("goRoutine", "UpdateManifest").Msgf("Current retry=%v artifact: %v:%v, digest: %s", + count, repoKey, imageName, + art.Digest) _, des, _, e := c.localRegistry.PullManifest(ctx2, art, acceptHeader, ifNoneMatchHeader) - if e != nil { - log.Info().Stack().Err(err).Msgf("failed to get manifest during remote cache update, error %v", err) + if len(e) > 0 { + log.Ctx(ctx2).Info().Str("goRoutine", + "UpdateManifest").Stack().Err(err).Msgf("Local manifest doesn't exist, error %v", e[0]) } // Push manifest to localRegistry when pull with digest, or artifact not found, or digest mismatch. errs := []error{} @@ -254,17 +249,21 @@ func (c *controller) ProxyManifest( if len(artInfo.Digest) == 0 { artInfo.Digest = dig } - // Push manifest to localRegistry. - _, errs = c.localRegistry.PutManifest(ctx2, art, ct, ByteToReadCloser(payload), int64(len(payload))) + + err = c.waitAndPushManifest(ctx2, art, ct, man) + if err != nil { + continue + } } // Query artifact after push. if e == nil || commons.IsEmpty(errs) { _, _, _, err := c.localRegistry.PullManifest(ctx2, art, acceptHeader, ifNoneMatchHeader) if err != nil { - log.Error().Stack().Msgf("failed to get manifest, error %v", err) + log.Ctx(ctx2).Error().Str("goRoutine", + "UpdateManifest").Stack().Msgf("failed to get manifest, error %v", err) } else { - log.Info().Msgf( + log.Ctx(ctx2).Info().Str("goRoutine", "UpdateManifest").Msgf( "Completed manifest push to localRegistry registry. Image: %s, Tag: %s, Digest: %s", art.Image, art.Tag, art.Digest, ) @@ -308,15 +307,27 @@ func (c *controller) ProxyBlob( return 0, nil, errcode.ErrorCodeBlobUnknown.WithDetail(art.Digest) } desc := manifest.Descriptor{Size: size, Digest: digest.Digest(art.Digest)} + + // This GoRoutine is to push the blob from Remote to Local registry. No retry logic is defined here. go func(art pkg.RegistryInfo) { // Cloning Context. - session, _ := request.AuthSessionFrom(ctx) + session, ok := request.AuthSessionFrom(ctx) + if !ok { + log.Error().Stack().Err(err).Msg("failed to get auth session from context") + return + } ctx2 := request.WithAuthSession(context.Background(), session) + ctx2 = log.Ctx(ctx2).With(). + Str("goRoutine", "AddBlob"). + Logger().WithContext(ctx2) err := c.putBlobToLocal(ctx2, art, remoteImage, repoKey, desc, rHelper) if err != nil { - log.Error().Stack().Err(err).Msgf("error while putting blob to localRegistry registry, %v", err) + log.Ctx(ctx2).Error().Str("goRoutine", + "AddBlob").Stack().Err(err).Msgf("error while putting blob to localRegistry registry, %v", err) + return } - log.Info().Msgf("Successfully updated the cache for digest %s", art.Digest) + log.Ctx(ctx2).Info().Str("goRoutine", "AddBlob").Msgf("Successfully updated the cache for digest %s", + art.Digest) }(art) return size, bReader, nil } @@ -358,6 +369,24 @@ func (c *controller) putBlobToLocal( return err } +func (c *controller) waitAndPushManifest( + ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest, +) error { + h, ok := c.manifestCacheHandlerMap[contentType] + if !ok { + h, ok = c.manifestCacheHandlerMap[DefaultHandler] + if !ok { + return fmt.Errorf("failed to get default manifest cache handler") + } + } + err := h.CacheContent(ctx, art, contentType, man) + if err != nil { + log.Error().Stack().Err(err).Msgf("Error in caching manifest: %s", err) + return err + } + return nil +} + func getRemoteRepo(art pkg.RegistryInfo) string { return art.Image } @@ -368,3 +397,112 @@ func getReference(art pkg.RegistryInfo) string { } return art.Tag } + +const DefaultHandler = "default" + +// ManifestCache default Manifest handler. +type ManifestCache struct { + localRegistry registryInterface + localManifestRegistry registryManifestInterface +} + +func GetManifestCache(localRegistry registryInterface, localManifestRegistry registryManifestInterface) *ManifestCache { + return &ManifestCache{ + localRegistry: localRegistry, + localManifestRegistry: localManifestRegistry, + } +} + +// ManifestListCache handle Manifest list type and index type. +type ManifestListCache struct { + localRegistry registryInterface +} + +func GetManifestListCache(localRegistry registryInterface) *ManifestListCache { + return &ManifestListCache{localRegistry: localRegistry} +} + +// ManifestCacheHandler define how to cache manifest content. +type ManifestCacheHandler interface { + // CacheContent - cache the content of the manifest + CacheContent(ctx context.Context, art pkg.RegistryInfo, contentType string, m manifest.Manifest) error +} + +func (m *ManifestCache) CacheContent( + ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest, +) error { + _, payload, err := man.Payload() + if err != nil { + return err + } + // Push manifest to localRegistry. + _, errs := m.localRegistry.PutManifest(ctx, art, contentType, ByteToReadCloser(payload), int64(len(payload))) + if len(errs) > 0 { + return errs[0] + } + + for n := 0; n < maxManifestMappingWait; n++ { + time.Sleep(maxManifestMappingIntervalSec * time.Second) + err = m.localManifestRegistry.AddManifestAssociation(ctx, art.RegIdentifier, digest.Digest(art.Digest), art) + if err != nil { + log.Error().Stack().Err(err).Msgf("failed to add manifest association, error %v", err) + continue + } + return nil + } + log.Ctx(ctx).Info().Msgf("Successfully cached manifest for image: %s, tag: %s, digest: %s", + art.Image, art.Tag, art.Digest) + return err +} + +func (m *ManifestListCache) CacheContent( + ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest, +) error { + _, payload, err := man.Payload() + if err != nil { + log.Error().Msg("failed to get payload") + return err + } + if len(getReference(art)) == 0 { + log.Error().Msg("failed to get reference, reference is empty, skip to cache manifest list") + return fmt.Errorf("failed to get reference, reference is empty, skip to cache manifest list") + } + // cache key should contain digest if digest exist + if len(art.Digest) == 0 { + art.Digest = string(digest.FromBytes(payload)) + } + + if err = m.push(ctx, art, man, contentType); err != nil { + log.Error().Msgf("error when push manifest list to local :%v", err) + return err + } + log.Ctx(ctx).Info().Msgf("Successfully cached manifest list for image: %s, tag: %s, digest: %s", + art.Image, art.Tag, art.Digest) + return nil +} + +func (m *ManifestListCache) push( + ctx context.Context, art pkg.RegistryInfo, man manifest.Manifest, contentType string, +) error { + if len(man.References()) == 0 { + return errors.New("manifest list doesn't contain any pushed manifest") + } + _, pl, err := man.Payload() + if err != nil { + log.Error().Msgf("failed to get payload, error %v", err) + return err + } + log.Debug().Msgf("The manifest list payload: %v", string(pl)) + newDig := digest.FromBytes(pl) + // Because the manifest list maybe updated, need to recheck if it is exist in local + _, descriptor, manifest2, _ := m.localRegistry.PullManifest(ctx, art, nil, nil) + if manifest2 != nil && descriptor.Digest == newDig { + return nil + } + + _, errs := m.localRegistry.PutManifest(ctx, art, contentType, ByteToReadCloser(pl), int64(len(pl))) + if len(errs) > 0 { + return errs[0] + } + return nil +} diff --git a/registry/app/remote/controller/proxy/local.go b/registry/app/remote/controller/proxy/local.go index c4702576f..64e6aa6d1 100644 --- a/registry/app/remote/controller/proxy/local.go +++ b/registry/app/remote/controller/proxy/local.go @@ -86,4 +86,5 @@ type registryManifestInterface interface { headers *commons.ResponseHeaders, info pkg.RegistryInfo, ) error + AddManifestAssociation(ctx context.Context, repoKey string, digest digest.Digest, info pkg.RegistryInfo) error } diff --git a/registry/app/store/database.go b/registry/app/store/database.go index b26885a5c..a991feb54 100644 --- a/registry/app/store/database.go +++ b/registry/app/store/database.go @@ -144,6 +144,13 @@ type ManifestReferenceRepository interface { ) error } +type OCIImageIndexMappingRepository interface { + Create(ctx context.Context, ociManifest *types.OCIImageIndexMapping) error + GetAllByChildDigest(ctx context.Context, registryID int64, imageName string, childDigest types.Digest) ( + []*types.OCIImageIndexMapping, error, + ) +} + type LayerRepository interface { AssociateLayerBlob(ctx context.Context, m *types.Manifest, b *types.Blob) error } diff --git a/registry/app/store/database/oci_image_index_mapping.go b/registry/app/store/database/oci_image_index_mapping.go new file mode 100644 index 000000000..badf779d5 --- /dev/null +++ b/registry/app/store/database/oci_image_index_mapping.go @@ -0,0 +1,179 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/harness/gitness/app/api/request" + "github.com/harness/gitness/registry/app/store" + "github.com/harness/gitness/registry/app/store/database/util" + "github.com/harness/gitness/registry/types" + store2 "github.com/harness/gitness/store" + databaseg "github.com/harness/gitness/store/database" + "github.com/harness/gitness/store/database/dbtx" + + "github.com/jmoiron/sqlx" + "github.com/rs/zerolog/log" +) + +type ociImageIndexMappingDao struct { + db *sqlx.DB +} + +func NewOCIImageIndexMappingDao(db *sqlx.DB) store.OCIImageIndexMappingRepository { + return &ociImageIndexMappingDao{ + db: db, + } +} + +type ociImageIndexMappingDB struct { + ID int64 `db:"oci_mapping_id"` + ParentManifestID int64 `db:"oci_mapping_parent_manifest_id"` + ChildDigest []byte `db:"oci_mapping_child_digest"` + CreatedAt int64 `db:"oci_mapping_created_at"` + UpdatedAt int64 `db:"oci_mapping_updated_at"` + CreatedBy int64 `db:"oci_mapping_created_by"` + UpdatedBy int64 `db:"oci_mapping_updated_by"` +} + +func (dao *ociImageIndexMappingDao) Create( + ctx context.Context, + ociManifest *types.OCIImageIndexMapping, +) error { + const sqlQuery = ` + INSERT INTO oci_image_index_mappings ( + oci_mapping_parent_manifest_id, + oci_mapping_child_digest, + oci_mapping_created_at, + oci_mapping_updated_at, + oci_mapping_created_by, + oci_mapping_updated_by + ) VALUES ( + :oci_mapping_parent_manifest_id, + :oci_mapping_child_digest, + :oci_mapping_created_at, + :oci_mapping_updated_at, + :oci_mapping_created_by, + :oci_mapping_updated_by + ) ON CONFLICT (oci_mapping_parent_manifest_id, oci_mapping_child_digest) + DO NOTHING + RETURNING oci_mapping_id` + + db := dbtx.GetAccessor(ctx, dao.db) + internalManifest := mapToInternalOCIMapping(ctx, ociManifest) + query, args, err := db.BindNamed(sqlQuery, internalManifest) + if err != nil { + return databaseg.ProcessSQLErrorf(ctx, err, "Bind query failed") + } + + if err = db.QueryRowContext(ctx, query, args...).Scan(&ociManifest.ID); err != nil { + err = databaseg.ProcessSQLErrorf(ctx, err, "QueryRowContext failed") + if errors.Is(err, store2.ErrDuplicate) { + return nil + } + return fmt.Errorf("inserting OCI image index mapping: %w", err) + } + return nil +} + +func (dao *ociImageIndexMappingDao) GetAllByChildDigest( + ctx context.Context, registryID int64, imageName string, childDigest types.Digest, +) ([]*types.OCIImageIndexMapping, error) { + digestBytes, err := util.GetHexDecodedBytes(string(childDigest)) + if err != nil { + return nil, fmt.Errorf("failed to get digest bytes: %w", err) + } + const sqlQuery = ` + SELECT + oci_mapping_id, + oci_mapping_parent_manifest_id, + oci_mapping_child_digest, + oci_mapping_created_at, + oci_mapping_updated_at, + oci_mapping_created_by, + oci_mapping_updated_by + FROM + oci_image_index_mappings + JOIN manifests ON manifests.manifest_id = oci_image_index_mappings.oci_mapping_parent_manifest_id + WHERE + manifest_registry_id = $1 AND + manifest_image_name = $2 AND + oci_mapping_child_digest = $3` + + db := dbtx.GetAccessor(ctx, dao.db) + rows, err := db.QueryxContext(ctx, sqlQuery, registryID, imageName, digestBytes) + if err != nil || rows.Err() != nil { + return nil, databaseg.ProcessSQLErrorf(ctx, err, "QueryxContext failed") + } + defer rows.Close() + + var manifests []*types.OCIImageIndexMapping + for rows.Next() { + var dbManifest ociImageIndexMappingDB + if err := rows.StructScan(&dbManifest); err != nil { + return nil, databaseg.ProcessSQLErrorf(ctx, err, "StructScan failed") + } + manifests = append(manifests, mapToExternalOCIManifest(&dbManifest)) + } + return manifests, nil +} + +func mapToInternalOCIMapping(ctx context.Context, in *types.OCIImageIndexMapping) *ociImageIndexMappingDB { + if in.CreatedAt.IsZero() { + in.CreatedAt = time.Now() + } + in.UpdatedAt = time.Now() + session, _ := request.AuthSessionFrom(ctx) + if in.CreatedBy == 0 { + in.CreatedBy = session.Principal.ID + } + in.UpdatedBy = session.Principal.ID + childBytes, err := types.GetDigestBytes(in.ChildManifestDigest) + if err != nil { + log.Error().Msgf("failed to get digest bytes: %v", err) + } + + return &ociImageIndexMappingDB{ + ID: in.ID, + ParentManifestID: in.ParentManifestID, + ChildDigest: childBytes, + CreatedAt: in.CreatedAt.UnixMilli(), + UpdatedAt: in.UpdatedAt.UnixMilli(), + CreatedBy: in.CreatedBy, + UpdatedBy: in.UpdatedBy, + } +} + +func mapToExternalOCIManifest(in *ociImageIndexMappingDB) *types.OCIImageIndexMapping { + childDgst := types.Digest(util.GetHexEncodedString(in.ChildDigest)) + parsedChildDigest, err := childDgst.Parse() + if err != nil { + log.Error().Msgf("failed to child parse digest: %v", err) + } + + return &types.OCIImageIndexMapping{ + ID: in.ID, + ParentManifestID: in.ParentManifestID, + ChildManifestDigest: parsedChildDigest, + CreatedAt: time.UnixMilli(in.CreatedAt), + UpdatedAt: time.UnixMilli(in.UpdatedAt), + CreatedBy: in.CreatedBy, + UpdatedBy: in.UpdatedBy, + } +} diff --git a/registry/app/store/database/wire.go b/registry/app/store/database/wire.go index af13fffb6..293043c18 100644 --- a/registry/app/store/database/wire.go +++ b/registry/app/store/database/wire.go @@ -75,6 +75,10 @@ func ProvideManifestRefDao(db *sqlx.DB) store.ManifestReferenceRepository { return NewManifestReferenceDao(db) } +func ProvideOCIImageIndexMappingDao(db *sqlx.DB) store.OCIImageIndexMappingRepository { + return NewOCIImageIndexMappingDao(db) +} + func ProvideLayerDao(db *sqlx.DB, mtRepository store.MediaTypesRepository) store.LayerRepository { return NewLayersDao(db, mtRepository) } @@ -93,6 +97,7 @@ var WireSet = wire.NewSet( ProvideManifestDao, ProvideCleanupPolicyDao, ProvideManifestRefDao, + ProvideOCIImageIndexMappingDao, ProvideLayerDao, ProvideImageDao, ProvideArtifactDao, diff --git a/registry/types/oci_Image_index_mapping.go b/registry/types/oci_Image_index_mapping.go new file mode 100644 index 000000000..5e308690a --- /dev/null +++ b/registry/types/oci_Image_index_mapping.go @@ -0,0 +1,31 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" + + "github.com/opencontainers/go-digest" +) + +type OCIImageIndexMapping struct { + ID int64 + ParentManifestID int64 + ChildManifestDigest digest.Digest + CreatedAt time.Time + UpdatedAt time.Time + CreatedBy int64 + UpdatedBy int64 +}