mirror of
https://github.com/harness/drone.git
synced 2025-05-02 22:43:44 +08:00
feat: [AH-231]: Upstream features (#3560)
Added support for: 1. Default library/ prefix for docker hub if required. 2. Implemented manifest list support for upstream proxies. New table introduced: `oci_image_index_mappings` 3. Fixed Secret issue on UI for both gitness and harness code 4. Code refactoring to bring controller inside wire and other minor fixes around linting 5. Fixed few bugs around upstream proxy.
This commit is contained in:
parent
d789e5775e
commit
fb65327419
@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS oci_image_index_mappings;
|
@ -0,0 +1,16 @@
|
||||
CREATE TABLE IF NOT EXISTS oci_image_index_mappings
|
||||
(
|
||||
oci_mapping_id SERIAL PRIMARY KEY,
|
||||
oci_mapping_parent_manifest_id BIGINT NOT NULL,
|
||||
oci_mapping_child_digest bytea NOT NULL,
|
||||
oci_mapping_created_at BIGINT NOT NULL,
|
||||
oci_mapping_updated_at BIGINT NOT NULL,
|
||||
oci_mapping_created_by INTEGER NOT NULL,
|
||||
oci_mapping_updated_by INTEGER NOT NULL,
|
||||
CONSTRAINT unique_oci_mapping_digests
|
||||
UNIQUE (oci_mapping_parent_manifest_id, oci_mapping_child_digest),
|
||||
CONSTRAINT fk_oci_mapping_registry_id
|
||||
FOREIGN KEY (oci_mapping_parent_manifest_id)
|
||||
REFERENCES manifests(manifest_id)
|
||||
ON DELETE CASCADE
|
||||
)
|
@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS oci_image_index_mappings;
|
@ -0,0 +1,16 @@
|
||||
CREATE TABLE IF NOT EXISTS oci_image_index_mappings
|
||||
(
|
||||
oci_mapping_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
oci_mapping_parent_manifest_id BIGINT NOT NULL,
|
||||
oci_mapping_child_digest bytea NOT NULL,
|
||||
oci_mapping_created_at BIGINT NOT NULL,
|
||||
oci_mapping_updated_at BIGINT NOT NULL,
|
||||
oci_mapping_created_by INTEGER NOT NULL,
|
||||
oci_mapping_updated_by INTEGER NOT NULL,
|
||||
CONSTRAINT unique_oci_mapping_digests
|
||||
UNIQUE (oci_mapping_parent_manifest_id, oci_mapping_child_digest),
|
||||
CONSTRAINT fk_oci_mapping_registry_id
|
||||
FOREIGN KEY (oci_mapping_parent_manifest_id)
|
||||
REFERENCES manifests(manifest_id)
|
||||
ON DELETE CASCADE
|
||||
)
|
@ -442,14 +442,16 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
|
||||
artifactRepository := database2.ProvideArtifactDao(db)
|
||||
layerRepository := database2.ProvideLayerDao(db, mediaTypesRepository)
|
||||
eventReporter := docker.ProvideReporter()
|
||||
manifestService := docker.ManifestServiceProvider(registryRepository, manifestRepository, blobRepository, mediaTypesRepository, manifestReferenceRepository, tagRepository, imageRepository, artifactRepository, layerRepository, gcService, transactor, eventReporter, spacePathStore)
|
||||
ociImageIndexMappingRepository := database2.ProvideOCIImageIndexMappingDao(db)
|
||||
manifestService := docker.ManifestServiceProvider(registryRepository, manifestRepository, blobRepository, mediaTypesRepository, manifestReferenceRepository, tagRepository, imageRepository, artifactRepository, layerRepository, gcService, transactor, eventReporter, spacePathStore, ociImageIndexMappingRepository)
|
||||
registryBlobRepository := database2.ProvideRegistryBlobDao(db)
|
||||
bandwidthStatRepository := database2.ProvideBandwidthStatDao(db)
|
||||
downloadStatRepository := database2.ProvideDownloadStatDao(db)
|
||||
localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor)
|
||||
upstreamProxyConfigRepository := database2.ProvideUpstreamDao(db, registryRepository, spacePathStore)
|
||||
secretService := secret3.ProvideSecretService(secretStore, encrypter, spacePathStore)
|
||||
remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService)
|
||||
proxyController := docker.ProvideProxyController(localRegistry, manifestService, secretService, spacePathStore)
|
||||
remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService, proxyController)
|
||||
coreController := pkg.CoreControllerProvider(registryRepository)
|
||||
dockerController := docker.ControllerProvider(localRegistry, remoteRegistry, coreController, spaceStore, authorizer)
|
||||
handler := api2.NewHandlerProvider(dockerController, spaceStore, tokenStore, controller, authenticator, provider, authorizer)
|
||||
|
@ -320,6 +320,7 @@ func (c *APIController) CreateUpstreamProxyEntity(
|
||||
}
|
||||
|
||||
upstreamProxyConfigEntity.SecretSpaceID = *res.SecretSpaceId
|
||||
upstreamProxyConfigEntity.SecretIdentifier = *res.SecretIdentifier
|
||||
}
|
||||
return repoEntity, upstreamProxyConfigEntity, nil
|
||||
}
|
||||
|
@ -102,28 +102,9 @@ func (c *APIController) GetDockerArtifactManifests(
|
||||
}
|
||||
manifestDetailsList = append(manifestDetailsList, getManifestDetails(m, mConfig))
|
||||
case *ml.DeserializedManifestList:
|
||||
for _, manifestEntry := range reqManifest.Manifests {
|
||||
dgst, err := types.NewDigest(manifestEntry.Digest)
|
||||
if err != nil {
|
||||
return artifactManifestsErrorRs(err), nil
|
||||
}
|
||||
referencedManifest, err := c.ManifestStore.FindManifestByDigest(ctx, registry.ID, image, dgst)
|
||||
if err != nil {
|
||||
if errors.Is(err, store2.ErrResourceNotFound) {
|
||||
return artifactManifestsErrorRs(
|
||||
fmt.Errorf("manifest not found"),
|
||||
), nil
|
||||
}
|
||||
return artifactManifestsErrorRs(err), nil
|
||||
}
|
||||
mConfig, err := getManifestConfig(
|
||||
ctx, referencedManifest.Configuration.Digest,
|
||||
regInfo.RootIdentifier, c.StorageDriver,
|
||||
)
|
||||
if err != nil {
|
||||
return artifactManifestsErrorRs(err), nil
|
||||
}
|
||||
manifestDetailsList = append(manifestDetailsList, getManifestDetails(referencedManifest, mConfig))
|
||||
manifestDetailsList, err = c.getManifestList(ctx, reqManifest, registry, image, regInfo)
|
||||
if err != nil {
|
||||
return artifactManifestsErrorRs(err), nil
|
||||
}
|
||||
default:
|
||||
log.Ctx(ctx).Error().Stack().Err(err).Msgf("Unknown manifest type: %T", manifest)
|
||||
@ -141,6 +122,38 @@ func (c *APIController) GetDockerArtifactManifests(
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *APIController) getManifestList(
|
||||
ctx context.Context, reqManifest *ml.DeserializedManifestList, registry *types.Registry, image string,
|
||||
regInfo *RegistryRequestBaseInfo,
|
||||
) ([]artifact.DockerManifestDetails, error) {
|
||||
manifestDetailsList := []artifact.DockerManifestDetails{}
|
||||
for _, manifestEntry := range reqManifest.Manifests {
|
||||
dgst, err := types.NewDigest(manifestEntry.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
referencedManifest, err := c.ManifestStore.FindManifestByDigest(ctx, registry.ID, image, dgst)
|
||||
if err != nil {
|
||||
if errors.Is(err, store2.ErrResourceNotFound) {
|
||||
if registry.Type == artifact.RegistryTypeUPSTREAM {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("manifest: %s not found", dgst.String())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
mConfig, err := getManifestConfig(
|
||||
ctx, referencedManifest.Configuration.Digest,
|
||||
regInfo.RootIdentifier, c.StorageDriver,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
manifestDetailsList = append(manifestDetailsList, getManifestDetails(referencedManifest, mConfig))
|
||||
}
|
||||
return manifestDetailsList, nil
|
||||
}
|
||||
|
||||
func artifactManifestsErrorRs(err error) artifact.GetDockerArtifactManifestsResponseObject {
|
||||
return artifact.GetDockerArtifactManifests500JSONResponse{
|
||||
InternalServerErrorJSONResponse: artifact.InternalServerErrorJSONResponse(
|
||||
|
@ -394,6 +394,7 @@ func (c *APIController) UpdateUpstreamProxyEntity(
|
||||
return nil, nil, err
|
||||
}
|
||||
upstreamProxyConfigEntity.SecretSpaceID = *res.SecretSpaceId
|
||||
upstreamProxyConfigEntity.SecretIdentifier = *res.SecretIdentifier
|
||||
} else {
|
||||
upstreamProxyConfigEntity.UserName = ""
|
||||
upstreamProxyConfigEntity.SecretIdentifier = ""
|
||||
|
@ -45,10 +45,16 @@ func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
defer func() {
|
||||
if response.Body != nil {
|
||||
response.Body.Close()
|
||||
err := response.Body.Close()
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Msgf("Failed to close body: %v", err)
|
||||
}
|
||||
}
|
||||
if response.ReadCloser != nil {
|
||||
response.ReadCloser.Close()
|
||||
err := response.ReadCloser.Close()
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Msgf("Failed to close readCloser: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -57,6 +63,11 @@ func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, response.RedirectURL, http.StatusTemporaryRedirect)
|
||||
return
|
||||
}
|
||||
|
||||
if response.ResponseHeaders != nil && response.ResponseHeaders.Code == http.StatusMovedPermanently {
|
||||
response.ResponseHeaders.WriteToResponse(w)
|
||||
return
|
||||
}
|
||||
response.ResponseHeaders.WriteHeadersToResponse(w)
|
||||
if r.Method == http.MethodHead {
|
||||
return
|
||||
|
@ -46,6 +46,9 @@ func (h *Handler) GetManifest(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
response.ResponseHeaders.WriteToResponse(w)
|
||||
if response.ResponseHeaders.Code == http.StatusMovedPermanently {
|
||||
return
|
||||
}
|
||||
_, bytes, _ := response.Manifest.Payload()
|
||||
if _, err := w.Write(bytes); err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msg("Failed to write response")
|
||||
|
@ -404,16 +404,20 @@ func (r *LocalRegistry) fetchBlobInternal(
|
||||
}
|
||||
|
||||
if http.MethodGet == method {
|
||||
// This GoRoutine is used to update the bandwidth stat of the artifact
|
||||
go func(art pkg.RegistryInfo, dgst digest.Digest) {
|
||||
// Cloning Context.
|
||||
session, _ := request.AuthSessionFrom(ctx)
|
||||
ctx3 := request.WithAuthSession(context.Background(), session)
|
||||
err := r.dbBlobDownloadComplete(ctx3, dgst, info)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("error while putting bandwidth stat of artifact, %v", err)
|
||||
log.Ctx(ctx3).Error().Stack().Str("goRoutine",
|
||||
"UpdateBandwidth").Err(err).Msgf("error while putting bandwidth stat of artifact, %v",
|
||||
err)
|
||||
return
|
||||
}
|
||||
log.Info().Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest)
|
||||
log.Ctx(ctx3).Info().Str("goRoutine",
|
||||
"UpdateBandwidth").Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest)
|
||||
}(info, dgst)
|
||||
}
|
||||
|
||||
@ -435,16 +439,23 @@ func (r *LocalRegistry) PullManifest(
|
||||
ifNoneMatchHeader []string,
|
||||
) (responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifest manifest.Manifest, errs []error) {
|
||||
responseHeaders, descriptor, manifest, errs = r.ManifestExist(ctx, artInfo, acceptHeaders, ifNoneMatchHeader)
|
||||
|
||||
// This GoRoutine is used to update the download stat of the artifact when manifest is pulled
|
||||
go func(art pkg.RegistryInfo) {
|
||||
// Cloning Context.
|
||||
session, _ := request.AuthSessionFrom(ctx)
|
||||
ctx2 := request.WithAuthSession(context.Background(), session)
|
||||
ctx2 = log.Ctx(ctx2).With().
|
||||
Str("goRoutine", "UpdateDownload").
|
||||
Logger().WithContext(ctx2)
|
||||
err := r.dbGetManifestComplete(ctx2, artInfo)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err)
|
||||
log.Ctx(ctx2).Error().Str("goRoutine",
|
||||
"UpdateDownload").Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err)
|
||||
return
|
||||
}
|
||||
log.Info().Msgf("Successfully updated the download stat metrics %s", art.Digest)
|
||||
log.Ctx(ctx2).Info().Str("goRoutine",
|
||||
"UpdateDownload").Msgf("Successfully updated the download stat metrics %s", art.Digest)
|
||||
}(artInfo)
|
||||
return responseHeaders, descriptor, manifest, errs
|
||||
}
|
||||
@ -821,7 +832,7 @@ func (r *LocalRegistry) PutManifest(
|
||||
responseHeaders.Headers["Docker-Content-Digest"] = d.String()
|
||||
responseHeaders.Code = http.StatusCreated
|
||||
|
||||
log.Debug().Msg("Succeeded in putting manifest!")
|
||||
log.Ctx(ctx).Debug().Msgf("Succeeded in putting manifest: %s", d.String())
|
||||
return responseHeaders, errs
|
||||
}
|
||||
|
||||
@ -1651,6 +1662,11 @@ func (r *LocalRegistry) dbGetManifestComplete(
|
||||
ctx context.Context,
|
||||
info pkg.RegistryInfo,
|
||||
) error {
|
||||
// FIXME: Update logic incase requests are internal. Currently, we are updating the stats for all requests.
|
||||
if info.Digest == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := r.tx.WithTx(
|
||||
ctx, func(ctx context.Context) error {
|
||||
registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier)
|
||||
@ -1663,7 +1679,11 @@ func (r *LocalRegistry) dbGetManifestComplete(
|
||||
return err
|
||||
}
|
||||
|
||||
artifact, err := r.artifactDao.GetByName(ctx, image.ID, info.Digest)
|
||||
newDigest, err := types.NewDigest(digest.Digest(info.Digest))
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Stack().Err(err).Msgf("error parsing digest: %s %v", info.Digest, err)
|
||||
}
|
||||
artifact, err := r.artifactDao.GetByName(ctx, image.ID, newDigest.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -23,7 +23,8 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
gitnessappstore "github.com/harness/gitness/app/store"
|
||||
gas "github.com/harness/gitness/app/store"
|
||||
"github.com/harness/gitness/registry/app/api/openapi/contracts/artifact"
|
||||
"github.com/harness/gitness/registry/app/event"
|
||||
"github.com/harness/gitness/registry/app/manifest"
|
||||
"github.com/harness/gitness/registry/app/manifest/manifestlist"
|
||||
@ -47,19 +48,20 @@ import (
|
||||
)
|
||||
|
||||
type manifestService struct {
|
||||
registryDao store.RegistryRepository
|
||||
manifestDao store.ManifestRepository
|
||||
layerDao store.LayerRepository
|
||||
blobRepo store.BlobRepository
|
||||
mtRepository store.MediaTypesRepository
|
||||
tagDao store.TagRepository
|
||||
imageDao store.ImageRepository
|
||||
artifactDao store.ArtifactRepository
|
||||
manifestRefDao store.ManifestReferenceRepository
|
||||
spacePathStore gitnessappstore.SpacePathStore
|
||||
gcService gc.Service
|
||||
tx dbtx.Transactor
|
||||
reporter event.Reporter
|
||||
registryDao store.RegistryRepository
|
||||
manifestDao store.ManifestRepository
|
||||
layerDao store.LayerRepository
|
||||
blobRepo store.BlobRepository
|
||||
mtRepository store.MediaTypesRepository
|
||||
tagDao store.TagRepository
|
||||
imageDao store.ImageRepository
|
||||
artifactDao store.ArtifactRepository
|
||||
manifestRefDao store.ManifestReferenceRepository
|
||||
ociImageIndexMappingDao store.OCIImageIndexMappingRepository
|
||||
spacePathStore gas.SpacePathStore
|
||||
gcService gc.Service
|
||||
tx dbtx.Transactor
|
||||
reporter event.Reporter
|
||||
}
|
||||
|
||||
func NewManifestService(
|
||||
@ -67,22 +69,24 @@ func NewManifestService(
|
||||
blobRepo store.BlobRepository, mtRepository store.MediaTypesRepository, tagDao store.TagRepository,
|
||||
imageDao store.ImageRepository, artifactDao store.ArtifactRepository,
|
||||
layerDao store.LayerRepository, manifestRefDao store.ManifestReferenceRepository,
|
||||
tx dbtx.Transactor, gcService gc.Service, reporter event.Reporter, spacePathStore gitnessappstore.SpacePathStore,
|
||||
tx dbtx.Transactor, gcService gc.Service, reporter event.Reporter, spacePathStore gas.SpacePathStore,
|
||||
ociImageIndexMappingDao store.OCIImageIndexMappingRepository,
|
||||
) ManifestService {
|
||||
return &manifestService{
|
||||
registryDao: registryDao,
|
||||
manifestDao: manifestDao,
|
||||
layerDao: layerDao,
|
||||
blobRepo: blobRepo,
|
||||
mtRepository: mtRepository,
|
||||
tagDao: tagDao,
|
||||
artifactDao: artifactDao,
|
||||
imageDao: imageDao,
|
||||
manifestRefDao: manifestRefDao,
|
||||
gcService: gcService,
|
||||
tx: tx,
|
||||
reporter: reporter,
|
||||
spacePathStore: spacePathStore,
|
||||
registryDao: registryDao,
|
||||
manifestDao: manifestDao,
|
||||
layerDao: layerDao,
|
||||
blobRepo: blobRepo,
|
||||
mtRepository: mtRepository,
|
||||
tagDao: tagDao,
|
||||
artifactDao: artifactDao,
|
||||
imageDao: imageDao,
|
||||
manifestRefDao: manifestRefDao,
|
||||
gcService: gcService,
|
||||
tx: tx,
|
||||
reporter: reporter,
|
||||
spacePathStore: spacePathStore,
|
||||
ociImageIndexMappingDao: ociImageIndexMappingDao,
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,6 +111,7 @@ type ManifestService interface {
|
||||
) error
|
||||
DeleteTag(ctx context.Context, repoKey string, tag string, info pkg.RegistryInfo) (bool, error)
|
||||
DeleteManifest(ctx context.Context, repoKey string, d digest.Digest, info pkg.RegistryInfo) error
|
||||
AddManifestAssociation(ctx context.Context, repoKey string, digest digest.Digest, info pkg.RegistryInfo) error
|
||||
DBFindRepositoryBlob(
|
||||
ctx context.Context, desc manifest.Descriptor, repoID int64,
|
||||
info pkg.RegistryInfo,
|
||||
@ -204,7 +209,8 @@ func (l *manifestService) dbTagManifest(
|
||||
}
|
||||
|
||||
// Create or update artifact and tag records
|
||||
if err := l.createOrUpdateArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName, dgst); err != nil {
|
||||
if err := l.upsertArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName,
|
||||
dgst); err != nil {
|
||||
return formatFailedToTagErr(err)
|
||||
}
|
||||
|
||||
@ -246,7 +252,7 @@ func (l *manifestService) lockManifestForGC(ctx context.Context, repoID, manifes
|
||||
}
|
||||
|
||||
// Creates or updates artifact and tag records.
|
||||
func (l *manifestService) createOrUpdateArtifactAndTag(
|
||||
func (l *manifestService) upsertArtifactAndTag(
|
||||
ctx context.Context,
|
||||
registryID,
|
||||
manifestID int64,
|
||||
@ -418,7 +424,7 @@ func (l *manifestService) dbPutManifestV2(
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug().Msgf("manifest not found in database")
|
||||
log.Debug().Msgf("manifest %s not found in database", dgst.String())
|
||||
|
||||
cfg := &types.Configuration{
|
||||
MediaType: mfst.Config().MediaType,
|
||||
@ -524,6 +530,56 @@ func (l *manifestService) DBFindRepositoryBlob(
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// AddManifestAssociation This updates the manifestRefs for all new childDigests to their already existing parent
|
||||
// manifests. This is used when a manifest from a manifest list is pulled from the remote and manifest list already
|
||||
// exists in the database.
|
||||
func (l *manifestService) AddManifestAssociation(
|
||||
ctx context.Context, repoKey string, childDigest digest.Digest, info pkg.RegistryInfo,
|
||||
) error {
|
||||
newDigest, err2 := types.NewDigest(childDigest)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("failed to create digest: %s %w", childDigest, err2)
|
||||
}
|
||||
r, err := l.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get registry: %s %w", repoKey, err)
|
||||
}
|
||||
childManifest, err2 := l.manifestDao.FindManifestByDigest(ctx, r.ID, info.Image, newDigest)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("failed to find manifest by digest. Repo: %d Image: %s %w", r.ID, info.Image, err2)
|
||||
}
|
||||
mappings, err := l.ociImageIndexMappingDao.GetAllByChildDigest(ctx, r.ID, childManifest.ImageName, newDigest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get oci image index mappings. Repo: %d Image: %s %w",
|
||||
r.ID,
|
||||
childManifest.ImageName,
|
||||
err)
|
||||
}
|
||||
for _, mapping := range mappings {
|
||||
parentManifest, err := l.manifestDao.Get(ctx, mapping.ParentManifestID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get manifest with ID: %d %w", mapping.ParentManifestID, err)
|
||||
}
|
||||
if err := l.manifestRefDao.AssociateManifest(ctx, parentManifest, childManifest); err != nil {
|
||||
if errors.Is(err, util.ErrRefManifestNotFound) {
|
||||
// This can only happen if the online GC deleted one
|
||||
// of the referenced manifests (because they were
|
||||
// untagged/unreferenced) between the call to
|
||||
// `FindAndLockNBefore` and `AssociateManifest`. For now
|
||||
// we need to return this error to mimic the behaviour
|
||||
// of the corresponding filesystem validation.
|
||||
log.Error().
|
||||
Msgf("Failed to associate manifest Ref Manifest not found. parentDigest:%s childDigest:%s %v",
|
||||
parentManifest.Digest.String(),
|
||||
childManifest.Digest.String(),
|
||||
err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *manifestService) handleSubject(
|
||||
ctx context.Context, subject manifest.Descriptor,
|
||||
artifactType string, annotations map[string]string, dbRepo *types.Registry,
|
||||
@ -620,15 +676,9 @@ func (l *manifestService) dbPutManifestList(
|
||||
ImageName: info.Image,
|
||||
}
|
||||
|
||||
mm := make([]*types.Manifest, 0, len(manifestList.Manifests))
|
||||
ids := make([]int64, 0, len(mm))
|
||||
for _, desc := range manifestList.Manifests {
|
||||
m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mm = append(mm, m)
|
||||
ids = append(ids, m.ID)
|
||||
mm, ids, err2 := l.validateManifestList(ctx, manifestList, r, info)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
err = l.tx.WithTx(
|
||||
@ -675,14 +725,102 @@ func (l *manifestService) dbPutManifestList(
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = l.mapManifestList(ctx, ml.ID, manifestList, r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to map manifest list: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msgf("failed to create manifest list in database")
|
||||
log.Ctx(ctx).Error().Err(err).Msgf("failed to create manifest list in database: %v", err)
|
||||
return fmt.Errorf("failed to create manifest list in database: %w", err)
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *manifestService) validateManifestIndex(
|
||||
ctx context.Context, manifestList *ocischema.DeserializedImageIndex, r *types.Registry, info pkg.RegistryInfo,
|
||||
) ([]*types.Manifest, []int64, error) {
|
||||
mm := make([]*types.Manifest, 0, len(manifestList.Manifests))
|
||||
ids := make([]int64, 0, len(manifestList.Manifests))
|
||||
for _, desc := range manifestList.Manifests {
|
||||
m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest)
|
||||
if errors.Is(err, gitnessstore.ErrResourceNotFound) && r.Type == artifact.RegistryTypeUPSTREAM {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
mm = append(mm, m)
|
||||
ids = append(ids, m.ID)
|
||||
}
|
||||
log.Ctx(ctx).Debug().Msgf("validated %d / %d manifests in index", len(mm), len(manifestList.Manifests))
|
||||
return mm, ids, nil
|
||||
}
|
||||
|
||||
func (l *manifestService) mapManifestIndex(
|
||||
ctx context.Context, mi int64, manifestList *ocischema.DeserializedImageIndex, r *types.Registry,
|
||||
) error {
|
||||
if r.Type != artifact.RegistryTypeUPSTREAM {
|
||||
return nil
|
||||
}
|
||||
for _, desc := range manifestList.Manifests {
|
||||
err := l.ociImageIndexMappingDao.Create(ctx, &types.OCIImageIndexMapping{
|
||||
ParentManifestID: mi,
|
||||
ChildManifestDigest: desc.Digest,
|
||||
})
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).
|
||||
Msgf("failed to create oci image index manifest for digest %s", desc.Digest)
|
||||
return fmt.Errorf("failed to create oci image index manifest: %w", err)
|
||||
}
|
||||
}
|
||||
log.Ctx(ctx).Debug().Msgf("successfully mapped manifest index %d with its manifests", mi)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *manifestService) validateManifestList(
|
||||
ctx context.Context, manifestList *manifestlist.DeserializedManifestList, r *types.Registry, info pkg.RegistryInfo,
|
||||
) ([]*types.Manifest, []int64, error) {
|
||||
mm := make([]*types.Manifest, 0, len(manifestList.Manifests))
|
||||
ids := make([]int64, 0, len(manifestList.Manifests))
|
||||
for _, desc := range manifestList.Manifests {
|
||||
m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest)
|
||||
if errors.Is(err, gitnessstore.ErrResourceNotFound) && r.Type == artifact.RegistryTypeUPSTREAM {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
mm = append(mm, m)
|
||||
ids = append(ids, m.ID)
|
||||
}
|
||||
log.Ctx(ctx).Debug().Msgf("validated %d / %d manifests in list", len(mm), len(manifestList.Manifests))
|
||||
return mm, ids, nil
|
||||
}
|
||||
|
||||
func (l *manifestService) mapManifestList(
|
||||
ctx context.Context, mi int64, manifestList *manifestlist.DeserializedManifestList, r *types.Registry,
|
||||
) error {
|
||||
if r.Type != artifact.RegistryTypeUPSTREAM {
|
||||
return nil
|
||||
}
|
||||
for _, desc := range manifestList.Manifests {
|
||||
err := l.ociImageIndexMappingDao.Create(ctx, &types.OCIImageIndexMapping{
|
||||
ParentManifestID: mi,
|
||||
ChildManifestDigest: desc.Digest,
|
||||
})
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error().Err(err).Msgf("failed to create oci image index manifest for digest %s", desc.Digest)
|
||||
return fmt.Errorf("failed to create oci image index manifest: %w", err)
|
||||
}
|
||||
}
|
||||
log.Ctx(ctx).Debug().Msgf("successfully mapped manifest list %d with its manifests", mi)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *manifestService) dbPutImageIndex(
|
||||
@ -738,15 +876,9 @@ func (l *manifestService) dbPutImageIndex(
|
||||
return subjectHandlingError
|
||||
}
|
||||
|
||||
mm := make([]*types.Manifest, 0, len(imageIndex.Manifests))
|
||||
ids := make([]int64, 0, len(mm))
|
||||
for _, desc := range imageIndex.Manifests {
|
||||
m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mm = append(mm, m)
|
||||
ids = append(ids, m.ID)
|
||||
mm, ids, err := l.validateManifestIndex(ctx, imageIndex, r, info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to map manifest index: %w", err)
|
||||
}
|
||||
|
||||
err = l.tx.WithTx(
|
||||
@ -792,6 +924,11 @@ func (l *manifestService) dbPutImageIndex(
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = l.mapManifestIndex(ctx, mi.ID, imageIndex, r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to map manifest index: %w", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
@ -843,8 +980,8 @@ func (l *manifestService) dbFindManifestListManifest(
|
||||
if err != nil {
|
||||
if errors.Is(err, gitnessstore.ErrResourceNotFound) {
|
||||
return nil, fmt.Errorf(
|
||||
"manifest %s not found for %s/%s", digest.String(),
|
||||
repository.Name, imageName,
|
||||
"manifest %s not found for %s/%s: %w", digest.String(),
|
||||
repository.Name, imageName, err,
|
||||
)
|
||||
}
|
||||
return nil, err
|
||||
|
@ -21,12 +21,16 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/harness/gitness/app/api/request"
|
||||
store2 "github.com/harness/gitness/app/store"
|
||||
"github.com/harness/gitness/registry/app/api/openapi/contracts/artifact"
|
||||
"github.com/harness/gitness/registry/app/common/lib/errors"
|
||||
"github.com/harness/gitness/registry/app/manifest"
|
||||
"github.com/harness/gitness/registry/app/manifest/manifestlist"
|
||||
"github.com/harness/gitness/registry/app/manifest/schema2"
|
||||
"github.com/harness/gitness/registry/app/pkg"
|
||||
"github.com/harness/gitness/registry/app/pkg/commons"
|
||||
proxy2 "github.com/harness/gitness/registry/app/remote/controller/proxy"
|
||||
@ -49,14 +53,26 @@ const (
|
||||
|
||||
func NewRemoteRegistry(
|
||||
local *LocalRegistry, app *App, upstreamProxyConfigRepo store.UpstreamProxyConfigRepository,
|
||||
spacePathStore store2.SpacePathStore, secretService secret.Service,
|
||||
spacePathStore store2.SpacePathStore, secretService secret.Service, proxyCtl proxy2.Controller,
|
||||
) Registry {
|
||||
cache := proxy2.GetManifestCache(local, local.ms)
|
||||
listCache := proxy2.GetManifestListCache(local)
|
||||
|
||||
registry := map[string]proxy2.ManifestCacheHandler{
|
||||
manifestlist.MediaTypeManifestList: listCache,
|
||||
v1.MediaTypeImageIndex: listCache,
|
||||
schema2.MediaTypeManifest: cache,
|
||||
proxy2.DefaultHandler: cache,
|
||||
}
|
||||
|
||||
return &RemoteRegistry{
|
||||
local: local,
|
||||
App: app,
|
||||
upstreamProxyConfigRepo: upstreamProxyConfigRepo,
|
||||
spacePathStore: spacePathStore,
|
||||
secretService: secretService,
|
||||
manifestCacheHandlerMap: registry,
|
||||
proxyCtl: proxyCtl,
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,21 +86,42 @@ type RemoteRegistry struct {
|
||||
upstreamProxyConfigRepo store.UpstreamProxyConfigRepository
|
||||
spacePathStore store2.SpacePathStore
|
||||
secretService secret.Service
|
||||
proxyCtl proxy2.Controller
|
||||
manifestCacheHandlerMap map[string]proxy2.ManifestCacheHandler
|
||||
}
|
||||
|
||||
func (r *RemoteRegistry) Base() error {
|
||||
panic("Not implemented yet, will be done during Replication flows")
|
||||
}
|
||||
|
||||
func defaultLibrary() (bool, string, error) {
|
||||
// get upstream Repository and check if the path contains library prefix. If yes, redirect to the correct path without
|
||||
// library prefix.
|
||||
return false, "", nil
|
||||
// defaultLibrary checks if we need to append "library/" to dockerhub images. For example, if the image is
|
||||
// "alpine" then we need to append "library/alpine" to the image.
|
||||
func (r *RemoteRegistry) defaultLibrary(ctx context.Context, artInfo pkg.RegistryInfo) (bool, error) {
|
||||
upstreamProxy, err := r.upstreamProxyConfigRepo.GetByRegistryIdentifier(
|
||||
ctx, artInfo.ParentID, artInfo.RegIdentifier,
|
||||
)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if upstreamProxy.Source != string(artifact.UpstreamConfigSourceDockerhub) {
|
||||
log.Ctx(ctx).Debug().Msg("upstream proxy source is not Dockerhub")
|
||||
return false, nil
|
||||
}
|
||||
if strings.Contains(artInfo.Image, "/") {
|
||||
log.Ctx(ctx).Debug().Msgf("image name %s contains a slash", artInfo.Image)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// defaultManifestURL return the real url for request with default project.
|
||||
func defaultManifestURL(regIdentifier string, name string, a pkg.RegistryInfo) string {
|
||||
return fmt.Sprintf("/v2/%s/library/%s/manifests/%s", regIdentifier, name, a.Reference)
|
||||
func defaultManifestURL(rootIdentifier string, regIdentifier string, name string, a pkg.RegistryInfo) string {
|
||||
return fmt.Sprintf("/v2/%s/%s/library/%s/manifests/%s", rootIdentifier, regIdentifier, name, a.Reference)
|
||||
}
|
||||
|
||||
// defaultBlobURL return the real url for request with default project.
|
||||
func defaultBlobURL(rootIdentifier string, regIdentifier string, name string, digest string) string {
|
||||
return fmt.Sprintf("/v2/%s/%s/library/%s/blobs/%s", rootIdentifier, regIdentifier, name, digest)
|
||||
}
|
||||
|
||||
func proxyManifestHead(
|
||||
@ -106,6 +143,7 @@ func proxyManifestHead(
|
||||
return errors.NotFoundError(fmt.Errorf("the tag %v:%v is not found", art.Image, art.Tag))
|
||||
}
|
||||
|
||||
// This goRoutine is to update the tag of recently pulled manifest if required
|
||||
if len(art.Tag) > 0 {
|
||||
go func(art pkg.RegistryInfo) {
|
||||
// Write function to update local storage.
|
||||
@ -119,12 +157,17 @@ func proxyManifestHead(
|
||||
for i := 0; i < ensureTagMaxRetry; i++ {
|
||||
time.Sleep(ensureTagInterval)
|
||||
count++
|
||||
log.Ctx(ctx).Info().Msgf("Ensure tag: %s for image: %s, retry: %d", tag, info.Image, count)
|
||||
log.Ctx(ctx2).Info().Str("goRoutine", "EnsureTag").Msgf("Tag %s for image: %s, retry: %d", tag,
|
||||
info.Image,
|
||||
count)
|
||||
e := ctl.EnsureTag(ctx2, responseHeaders, art, acceptHeaders, ifNoneMatchHeader)
|
||||
if e != nil {
|
||||
log.Ctx(ctx).Warn().Err(e).Msgf("Failed to update tag: ")
|
||||
log.Ctx(ctx2).Warn().Str("goRoutine",
|
||||
"EnsureTag").Err(e).Msgf("Failed to update tag: %s for image: %s",
|
||||
tag, info.Image)
|
||||
} else {
|
||||
log.Ctx(ctx).Info().Msgf("Tag updated: %s for image: %s", tag, info.Image)
|
||||
log.Ctx(ctx2).Info().Str("goRoutine", "EnsureTag").Msgf("Tag updated: %s for image: %s", tag,
|
||||
info.Image)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -147,20 +190,19 @@ func (r *RemoteRegistry) ManifestExist(
|
||||
responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifestResult manifest.Manifest,
|
||||
errs []error,
|
||||
) {
|
||||
proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore)
|
||||
responseHeaders = &commons.ResponseHeaders{
|
||||
Headers: make(map[string]string),
|
||||
}
|
||||
defaultProj, name, err := defaultLibrary()
|
||||
isDefault, err := r.defaultLibrary(ctx, artInfo)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return responseHeaders, descriptor, manifestResult, errs
|
||||
}
|
||||
registryInfo := artInfo
|
||||
if defaultProj {
|
||||
if isDefault {
|
||||
responseHeaders.Code = http.StatusMovedPermanently
|
||||
responseHeaders.Headers = map[string]string{
|
||||
"Location": defaultManifestURL(artInfo.RegIdentifier, name, registryInfo),
|
||||
"Location": defaultManifestURL(artInfo.RootIdentifier, artInfo.RegIdentifier, artInfo.Image, registryInfo),
|
||||
}
|
||||
return responseHeaders, descriptor, manifestResult, errs
|
||||
}
|
||||
@ -183,7 +225,7 @@ func (r *RemoteRegistry) ManifestExist(
|
||||
errs = append(errs, errors.New("Proxy is down"))
|
||||
return responseHeaders, descriptor, manifestResult, errs
|
||||
}
|
||||
useLocal, man, err := proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader)
|
||||
useLocal, man, err := r.proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader)
|
||||
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
@ -210,7 +252,7 @@ func (r *RemoteRegistry) ManifestExist(
|
||||
err = proxyManifestHead(
|
||||
ctx,
|
||||
responseHeaders,
|
||||
proxyCtl,
|
||||
r.proxyCtl,
|
||||
registryInfo,
|
||||
remoteHelper,
|
||||
artInfo,
|
||||
@ -237,20 +279,19 @@ func (r *RemoteRegistry) PullManifest(
|
||||
responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifestResult manifest.Manifest,
|
||||
errs []error,
|
||||
) {
|
||||
proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore)
|
||||
responseHeaders = &commons.ResponseHeaders{
|
||||
Headers: make(map[string]string),
|
||||
}
|
||||
defaultProj, name, err := defaultLibrary()
|
||||
isDefault, err := r.defaultLibrary(ctx, artInfo)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return responseHeaders, descriptor, manifestResult, errs
|
||||
}
|
||||
registryInfo := artInfo
|
||||
if defaultProj {
|
||||
if isDefault {
|
||||
responseHeaders.Code = http.StatusMovedPermanently
|
||||
responseHeaders.Headers = map[string]string{
|
||||
"Location": defaultManifestURL(artInfo.RegIdentifier, name, registryInfo),
|
||||
"Location": defaultManifestURL(artInfo.RootIdentifier, artInfo.RegIdentifier, artInfo.Image, registryInfo),
|
||||
}
|
||||
return responseHeaders, descriptor, manifestResult, errs
|
||||
}
|
||||
@ -272,7 +313,7 @@ func (r *RemoteRegistry) PullManifest(
|
||||
errs = append(errs, errors.New("Proxy is down"))
|
||||
return responseHeaders, descriptor, manifestResult, errs
|
||||
}
|
||||
useLocal, man, err := proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader)
|
||||
useLocal, man, err := r.proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader)
|
||||
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
@ -304,7 +345,7 @@ func (r *RemoteRegistry) PullManifest(
|
||||
manifestResult, err = proxyManifestGet(
|
||||
ctx,
|
||||
responseHeaders,
|
||||
proxyCtl,
|
||||
r.proxyCtl,
|
||||
registryInfo,
|
||||
remoteHelper,
|
||||
artInfo.RegIdentifier,
|
||||
@ -352,7 +393,6 @@ func (r *RemoteRegistry) fetchBlobInternal(
|
||||
responseHeaders *commons.ResponseHeaders, fr *storage.FileReader, size int64, readCloser io.ReadCloser,
|
||||
redirectURL string, errs []error,
|
||||
) {
|
||||
proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore)
|
||||
responseHeaders = &commons.ResponseHeaders{
|
||||
Headers: make(map[string]string),
|
||||
}
|
||||
@ -360,7 +400,7 @@ func (r *RemoteRegistry) fetchBlobInternal(
|
||||
log.Ctx(ctx).Info().Msgf("Proxy: %s", repoKey)
|
||||
|
||||
// Handle dockerhub request without library prefix.
|
||||
isDefault, name, err := defaultLibrary()
|
||||
isDefault, err := r.defaultLibrary(ctx, info)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return responseHeaders, fr, size, readCloser, redirectURL, errs
|
||||
@ -369,7 +409,7 @@ func (r *RemoteRegistry) fetchBlobInternal(
|
||||
if isDefault {
|
||||
responseHeaders.Code = http.StatusMovedPermanently
|
||||
responseHeaders.Headers = map[string]string{
|
||||
"Location": defaultManifestURL(repoKey, name, registryInfo),
|
||||
"Location": defaultBlobURL(info.RootIdentifier, repoKey, info.Image, info.Digest),
|
||||
}
|
||||
return responseHeaders, fr, size, readCloser, redirectURL, errs
|
||||
}
|
||||
@ -378,7 +418,7 @@ func (r *RemoteRegistry) fetchBlobInternal(
|
||||
errs = append(errs, errors.New("Blob not found"))
|
||||
}
|
||||
|
||||
if proxyCtl.UseLocalBlob(ctx, registryInfo) {
|
||||
if r.proxyCtl.UseLocalBlob(ctx, registryInfo) {
|
||||
switch method {
|
||||
case http.MethodGet:
|
||||
headers, reader, s, closer, url, e := r.local.GetBlob(ctx, info)
|
||||
@ -398,7 +438,7 @@ func (r *RemoteRegistry) fetchBlobInternal(
|
||||
}
|
||||
|
||||
// This is start of proxy Code.
|
||||
size, readCloser, err = proxyCtl.ProxyBlob(ctx, registryInfo, repoKey, *upstreamProxy)
|
||||
size, readCloser, err = r.proxyCtl.ProxyBlob(ctx, registryInfo, repoKey, *upstreamProxy)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return responseHeaders, fr, size, readCloser, redirectURL, errs
|
||||
|
@ -19,7 +19,10 @@ import (
|
||||
gitnessstore "github.com/harness/gitness/app/store"
|
||||
storagedriver "github.com/harness/gitness/registry/app/driver"
|
||||
"github.com/harness/gitness/registry/app/event"
|
||||
"github.com/harness/gitness/registry/app/manifest/manifestlist"
|
||||
"github.com/harness/gitness/registry/app/manifest/schema2"
|
||||
"github.com/harness/gitness/registry/app/pkg"
|
||||
proxy2 "github.com/harness/gitness/registry/app/remote/controller/proxy"
|
||||
"github.com/harness/gitness/registry/app/storage"
|
||||
"github.com/harness/gitness/registry/app/store"
|
||||
"github.com/harness/gitness/registry/gc"
|
||||
@ -28,6 +31,7 @@ import (
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/google/wire"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func LocalRegistryProvider(
|
||||
@ -51,18 +55,21 @@ func ManifestServiceProvider(
|
||||
manifestRefDao store.ManifestReferenceRepository, tagDao store.TagRepository, imageDao store.ImageRepository,
|
||||
artifactDao store.ArtifactRepository, layerDao store.LayerRepository,
|
||||
gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter, spacePathStore gitnessstore.SpacePathStore,
|
||||
ociImageIndexMappingDao store.OCIImageIndexMappingRepository,
|
||||
) ManifestService {
|
||||
return NewManifestService(
|
||||
registryDao, manifestDao, blobRepo, mtRepository, tagDao, imageDao,
|
||||
artifactDao, layerDao, manifestRefDao, tx, gcService, reporter, spacePathStore,
|
||||
ociImageIndexMappingDao,
|
||||
)
|
||||
}
|
||||
|
||||
func RemoteRegistryProvider(
|
||||
local *LocalRegistry, app *App, upstreamProxyConfigRepo store.UpstreamProxyConfigRepository,
|
||||
spacePathStore gitnessstore.SpacePathStore, secretService secret.Service,
|
||||
spacePathStore gitnessstore.SpacePathStore, secretService secret.Service, proxyCtrl proxy2.Controller,
|
||||
) *RemoteRegistry {
|
||||
return NewRemoteRegistry(local, app, upstreamProxyConfigRepo, spacePathStore, secretService).(*RemoteRegistry)
|
||||
return NewRemoteRegistry(local, app, upstreamProxyConfigRepo, spacePathStore, secretService,
|
||||
proxyCtrl).(*RemoteRegistry)
|
||||
}
|
||||
|
||||
func ControllerProvider(
|
||||
@ -83,8 +90,31 @@ func ProvideReporter() event.Reporter {
|
||||
return &event.Noop{}
|
||||
}
|
||||
|
||||
func ProvideProxyController(
|
||||
registry *LocalRegistry, ms ManifestService, secretService secret.Service,
|
||||
spacePathStore gitnessstore.SpacePathStore,
|
||||
) proxy2.Controller {
|
||||
manifestCacheHandler := getManifestCacheHandler(registry, ms)
|
||||
return proxy2.NewProxyController(registry, ms, secretService, spacePathStore, manifestCacheHandler)
|
||||
}
|
||||
|
||||
func getManifestCacheHandler(
|
||||
registry *LocalRegistry, ms ManifestService,
|
||||
) map[string]proxy2.ManifestCacheHandler {
|
||||
cache := proxy2.GetManifestCache(registry, ms)
|
||||
listCache := proxy2.GetManifestListCache(registry)
|
||||
|
||||
return map[string]proxy2.ManifestCacheHandler{
|
||||
manifestlist.MediaTypeManifestList: listCache,
|
||||
v1.MediaTypeImageIndex: listCache,
|
||||
schema2.MediaTypeManifest: cache,
|
||||
proxy2.DefaultHandler: cache,
|
||||
}
|
||||
}
|
||||
|
||||
var ControllerSet = wire.NewSet(ControllerProvider)
|
||||
var RegistrySet = wire.NewSet(LocalRegistryProvider, ManifestServiceProvider, RemoteRegistryProvider)
|
||||
var ProxySet = wire.NewSet(ProvideProxyController)
|
||||
var StorageServiceSet = wire.NewSet(StorageServiceProvider)
|
||||
var AppSet = wire.NewSet(NewApp)
|
||||
var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet)
|
||||
var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet, ProxySet)
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/harness/gitness/app/api/request"
|
||||
@ -41,18 +40,13 @@ import (
|
||||
|
||||
const (
|
||||
// wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready.
|
||||
maxManifestListWait = 20
|
||||
maxManifestWait = 10
|
||||
sleepIntervalSec = 20
|
||||
maxManifestWait = 10
|
||||
maxManifestMappingWait = 10
|
||||
maxManifestMappingIntervalSec = 10
|
||||
sleepIntervalSec = 20
|
||||
// keep manifest list in cache for one week.
|
||||
)
|
||||
|
||||
var (
|
||||
// Ctl is a global proxy controller instance.
|
||||
ctl Controller
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// Controller defines the operations related with pull through proxy.
|
||||
type Controller interface {
|
||||
// UseLocalBlob check if the blob should use localRegistry copy.
|
||||
@ -94,29 +88,25 @@ type Controller interface {
|
||||
}
|
||||
|
||||
type controller struct {
|
||||
localRegistry registryInterface
|
||||
localManifestRegistry registryManifestInterface
|
||||
secretService secret.Service
|
||||
spacePathStore store.SpacePathStore
|
||||
localRegistry registryInterface
|
||||
localManifestRegistry registryManifestInterface
|
||||
secretService secret.Service
|
||||
spacePathStore store.SpacePathStore
|
||||
manifestCacheHandlerMap map[string]ManifestCacheHandler
|
||||
}
|
||||
|
||||
// ControllerInstance -- get the proxy controller instance.
|
||||
func ControllerInstance(
|
||||
// NewProxyController -- get the proxy controller instance.
|
||||
func NewProxyController(
|
||||
l registryInterface, lm registryManifestInterface, secretService secret.Service,
|
||||
spacePathStore store.SpacePathStore,
|
||||
spacePathStore store.SpacePathStore, manifestCacheHandlerMap map[string]ManifestCacheHandler,
|
||||
) Controller {
|
||||
once.Do(
|
||||
func() {
|
||||
ctl = &controller{
|
||||
localRegistry: l,
|
||||
localManifestRegistry: lm,
|
||||
secretService: secretService,
|
||||
spacePathStore: spacePathStore,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
return ctl
|
||||
return &controller{
|
||||
localRegistry: l,
|
||||
localManifestRegistry: lm,
|
||||
secretService: secretService,
|
||||
spacePathStore: spacePathStore,
|
||||
manifestCacheHandlerMap: manifestCacheHandlerMap,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) EnsureTag(
|
||||
@ -133,7 +123,7 @@ func (c *controller) EnsureTag(
|
||||
return err[0]
|
||||
}
|
||||
|
||||
//Fixme: Need to properly pick tag.
|
||||
// Fixme: Need to properly pick tag.
|
||||
e := c.localManifestRegistry.DBTag(ctx, mfst, desc.Digest, info.Reference, info.RegIdentifier, rsHeaders, info)
|
||||
if e != nil {
|
||||
log.Error().Err(e).Msgf("Error in ensuring tag: %s", e)
|
||||
@ -147,7 +137,7 @@ func (c *controller) UseLocalBlob(ctx context.Context, art pkg.RegistryInfo) boo
|
||||
}
|
||||
// TODO: Get from Local storage.
|
||||
_, _, _, _, _, e := c.localRegistry.GetBlob(ctx, art)
|
||||
return e == nil
|
||||
return len(e) == 0
|
||||
}
|
||||
|
||||
// ManifestList ...
|
||||
@ -177,16 +167,18 @@ func (c *controller) UseLocalManifest(
|
||||
|
||||
remoteRepo := getRemoteRepo(art)
|
||||
exist, desc, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD.
|
||||
log.Info().Msgf("Manifest exist: %t %s %d %s", exist, desc.Digest.String(), desc.Size, desc.MediaType)
|
||||
// TODO: Check for rate limit error.
|
||||
if err != nil {
|
||||
if errors.IsRateLimitError(err) { // if rate limit, use localRegistry if it exists, otherwise return error.
|
||||
log.Ctx(ctx).Warn().Msgf("Rate limit error: %v", err)
|
||||
return true, nil, nil
|
||||
}
|
||||
log.Ctx(ctx).Warn().Msgf("Error in checking remote manifest exist: %v", err)
|
||||
return false, nil, err
|
||||
}
|
||||
log.Info().Msgf("Manifest exist: %t %s %d %s", exist, desc.Digest.String(), desc.Size, desc.MediaType)
|
||||
|
||||
// TODO: Delete if does not exist on remote.
|
||||
// TODO: Delete if does not exist on remote. Validate this
|
||||
if !exist || desc == nil {
|
||||
go func() {
|
||||
c.localRegistry.DeleteManifest(ctx, art)
|
||||
@ -194,7 +186,7 @@ func (c *controller) UseLocalManifest(
|
||||
return false, nil, errors.NotFoundError(fmt.Errorf("registry %v, tag %v not found", art.RegIdentifier, art.Tag))
|
||||
}
|
||||
|
||||
log.Info().Msgf("Manifest: %s %s", man, getReference(art))
|
||||
log.Info().Msgf("Manifest: %s", getReference(art))
|
||||
mediaType, payload, _ := man.Payload()
|
||||
|
||||
return true, &ManifestList{payload, d.Digest.String(), mediaType}, nil
|
||||
@ -228,13 +220,13 @@ func (c *controller) ProxyManifest(
|
||||
}
|
||||
return man, err
|
||||
}
|
||||
ct, payload, err := man.Payload()
|
||||
ct, _, err := man.Payload()
|
||||
log.Info().Msgf("Content type: %s", ct)
|
||||
if err != nil {
|
||||
return man, err
|
||||
}
|
||||
|
||||
// Push manifest in background.
|
||||
// This GoRoutine is to push the manifest from Remote to Local registry.
|
||||
go func(_, ct string) {
|
||||
session, _ := request.AuthSessionFrom(ctx)
|
||||
ctx2 := request.WithAuthSession(context.Background(), session)
|
||||
@ -242,10 +234,13 @@ func (c *controller) ProxyManifest(
|
||||
for n := 0; n < maxManifestWait; n++ {
|
||||
time.Sleep(sleepIntervalSec * time.Second)
|
||||
count++
|
||||
log.Info().Msgf("Current retry=%v artifact: %v:%v", count, repoKey, imageName)
|
||||
log.Ctx(ctx2).Info().Str("goRoutine", "UpdateManifest").Msgf("Current retry=%v artifact: %v:%v, digest: %s",
|
||||
count, repoKey, imageName,
|
||||
art.Digest)
|
||||
_, des, _, e := c.localRegistry.PullManifest(ctx2, art, acceptHeader, ifNoneMatchHeader)
|
||||
if e != nil {
|
||||
log.Info().Stack().Err(err).Msgf("failed to get manifest during remote cache update, error %v", err)
|
||||
if len(e) > 0 {
|
||||
log.Ctx(ctx2).Info().Str("goRoutine",
|
||||
"UpdateManifest").Stack().Err(err).Msgf("Local manifest doesn't exist, error %v", e[0])
|
||||
}
|
||||
// Push manifest to localRegistry when pull with digest, or artifact not found, or digest mismatch.
|
||||
errs := []error{}
|
||||
@ -254,17 +249,21 @@ func (c *controller) ProxyManifest(
|
||||
if len(artInfo.Digest) == 0 {
|
||||
artInfo.Digest = dig
|
||||
}
|
||||
// Push manifest to localRegistry.
|
||||
_, errs = c.localRegistry.PutManifest(ctx2, art, ct, ByteToReadCloser(payload), int64(len(payload)))
|
||||
|
||||
err = c.waitAndPushManifest(ctx2, art, ct, man)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Query artifact after push.
|
||||
if e == nil || commons.IsEmpty(errs) {
|
||||
_, _, _, err := c.localRegistry.PullManifest(ctx2, art, acceptHeader, ifNoneMatchHeader)
|
||||
if err != nil {
|
||||
log.Error().Stack().Msgf("failed to get manifest, error %v", err)
|
||||
log.Ctx(ctx2).Error().Str("goRoutine",
|
||||
"UpdateManifest").Stack().Msgf("failed to get manifest, error %v", err)
|
||||
} else {
|
||||
log.Info().Msgf(
|
||||
log.Ctx(ctx2).Info().Str("goRoutine", "UpdateManifest").Msgf(
|
||||
"Completed manifest push to localRegistry registry. Image: %s, Tag: %s, Digest: %s",
|
||||
art.Image, art.Tag, art.Digest,
|
||||
)
|
||||
@ -308,15 +307,27 @@ func (c *controller) ProxyBlob(
|
||||
return 0, nil, errcode.ErrorCodeBlobUnknown.WithDetail(art.Digest)
|
||||
}
|
||||
desc := manifest.Descriptor{Size: size, Digest: digest.Digest(art.Digest)}
|
||||
|
||||
// This GoRoutine is to push the blob from Remote to Local registry. No retry logic is defined here.
|
||||
go func(art pkg.RegistryInfo) {
|
||||
// Cloning Context.
|
||||
session, _ := request.AuthSessionFrom(ctx)
|
||||
session, ok := request.AuthSessionFrom(ctx)
|
||||
if !ok {
|
||||
log.Error().Stack().Err(err).Msg("failed to get auth session from context")
|
||||
return
|
||||
}
|
||||
ctx2 := request.WithAuthSession(context.Background(), session)
|
||||
ctx2 = log.Ctx(ctx2).With().
|
||||
Str("goRoutine", "AddBlob").
|
||||
Logger().WithContext(ctx2)
|
||||
err := c.putBlobToLocal(ctx2, art, remoteImage, repoKey, desc, rHelper)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("error while putting blob to localRegistry registry, %v", err)
|
||||
log.Ctx(ctx2).Error().Str("goRoutine",
|
||||
"AddBlob").Stack().Err(err).Msgf("error while putting blob to localRegistry registry, %v", err)
|
||||
return
|
||||
}
|
||||
log.Info().Msgf("Successfully updated the cache for digest %s", art.Digest)
|
||||
log.Ctx(ctx2).Info().Str("goRoutine", "AddBlob").Msgf("Successfully updated the cache for digest %s",
|
||||
art.Digest)
|
||||
}(art)
|
||||
return size, bReader, nil
|
||||
}
|
||||
@ -358,6 +369,24 @@ func (c *controller) putBlobToLocal(
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *controller) waitAndPushManifest(
|
||||
ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest,
|
||||
) error {
|
||||
h, ok := c.manifestCacheHandlerMap[contentType]
|
||||
if !ok {
|
||||
h, ok = c.manifestCacheHandlerMap[DefaultHandler]
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get default manifest cache handler")
|
||||
}
|
||||
}
|
||||
err := h.CacheContent(ctx, art, contentType, man)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("Error in caching manifest: %s", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRemoteRepo(art pkg.RegistryInfo) string {
|
||||
return art.Image
|
||||
}
|
||||
@ -368,3 +397,112 @@ func getReference(art pkg.RegistryInfo) string {
|
||||
}
|
||||
return art.Tag
|
||||
}
|
||||
|
||||
const DefaultHandler = "default"
|
||||
|
||||
// ManifestCache default Manifest handler.
|
||||
type ManifestCache struct {
|
||||
localRegistry registryInterface
|
||||
localManifestRegistry registryManifestInterface
|
||||
}
|
||||
|
||||
func GetManifestCache(localRegistry registryInterface, localManifestRegistry registryManifestInterface) *ManifestCache {
|
||||
return &ManifestCache{
|
||||
localRegistry: localRegistry,
|
||||
localManifestRegistry: localManifestRegistry,
|
||||
}
|
||||
}
|
||||
|
||||
// ManifestListCache handle Manifest list type and index type.
|
||||
type ManifestListCache struct {
|
||||
localRegistry registryInterface
|
||||
}
|
||||
|
||||
func GetManifestListCache(localRegistry registryInterface) *ManifestListCache {
|
||||
return &ManifestListCache{localRegistry: localRegistry}
|
||||
}
|
||||
|
||||
// ManifestCacheHandler define how to cache manifest content.
|
||||
type ManifestCacheHandler interface {
|
||||
// CacheContent - cache the content of the manifest
|
||||
CacheContent(ctx context.Context, art pkg.RegistryInfo, contentType string, m manifest.Manifest) error
|
||||
}
|
||||
|
||||
func (m *ManifestCache) CacheContent(
|
||||
ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest,
|
||||
) error {
|
||||
_, payload, err := man.Payload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Push manifest to localRegistry.
|
||||
_, errs := m.localRegistry.PutManifest(ctx, art, contentType, ByteToReadCloser(payload), int64(len(payload)))
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
}
|
||||
|
||||
for n := 0; n < maxManifestMappingWait; n++ {
|
||||
time.Sleep(maxManifestMappingIntervalSec * time.Second)
|
||||
err = m.localManifestRegistry.AddManifestAssociation(ctx, art.RegIdentifier, digest.Digest(art.Digest), art)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("failed to add manifest association, error %v", err)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
log.Ctx(ctx).Info().Msgf("Successfully cached manifest for image: %s, tag: %s, digest: %s",
|
||||
art.Image, art.Tag, art.Digest)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *ManifestListCache) CacheContent(
|
||||
ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest,
|
||||
) error {
|
||||
_, payload, err := man.Payload()
|
||||
if err != nil {
|
||||
log.Error().Msg("failed to get payload")
|
||||
return err
|
||||
}
|
||||
if len(getReference(art)) == 0 {
|
||||
log.Error().Msg("failed to get reference, reference is empty, skip to cache manifest list")
|
||||
return fmt.Errorf("failed to get reference, reference is empty, skip to cache manifest list")
|
||||
}
|
||||
// cache key should contain digest if digest exist
|
||||
if len(art.Digest) == 0 {
|
||||
art.Digest = string(digest.FromBytes(payload))
|
||||
}
|
||||
|
||||
if err = m.push(ctx, art, man, contentType); err != nil {
|
||||
log.Error().Msgf("error when push manifest list to local :%v", err)
|
||||
return err
|
||||
}
|
||||
log.Ctx(ctx).Info().Msgf("Successfully cached manifest list for image: %s, tag: %s, digest: %s",
|
||||
art.Image, art.Tag, art.Digest)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ManifestListCache) push(
|
||||
ctx context.Context, art pkg.RegistryInfo, man manifest.Manifest, contentType string,
|
||||
) error {
|
||||
if len(man.References()) == 0 {
|
||||
return errors.New("manifest list doesn't contain any pushed manifest")
|
||||
}
|
||||
_, pl, err := man.Payload()
|
||||
if err != nil {
|
||||
log.Error().Msgf("failed to get payload, error %v", err)
|
||||
return err
|
||||
}
|
||||
log.Debug().Msgf("The manifest list payload: %v", string(pl))
|
||||
newDig := digest.FromBytes(pl)
|
||||
// Because the manifest list maybe updated, need to recheck if it is exist in local
|
||||
_, descriptor, manifest2, _ := m.localRegistry.PullManifest(ctx, art, nil, nil)
|
||||
if manifest2 != nil && descriptor.Digest == newDig {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, errs := m.localRegistry.PutManifest(ctx, art, contentType, ByteToReadCloser(pl), int64(len(pl)))
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -86,4 +86,5 @@ type registryManifestInterface interface {
|
||||
headers *commons.ResponseHeaders,
|
||||
info pkg.RegistryInfo,
|
||||
) error
|
||||
AddManifestAssociation(ctx context.Context, repoKey string, digest digest.Digest, info pkg.RegistryInfo) error
|
||||
}
|
||||
|
@ -144,6 +144,13 @@ type ManifestReferenceRepository interface {
|
||||
) error
|
||||
}
|
||||
|
||||
type OCIImageIndexMappingRepository interface {
|
||||
Create(ctx context.Context, ociManifest *types.OCIImageIndexMapping) error
|
||||
GetAllByChildDigest(ctx context.Context, registryID int64, imageName string, childDigest types.Digest) (
|
||||
[]*types.OCIImageIndexMapping, error,
|
||||
)
|
||||
}
|
||||
|
||||
type LayerRepository interface {
|
||||
AssociateLayerBlob(ctx context.Context, m *types.Manifest, b *types.Blob) error
|
||||
}
|
||||
|
179
registry/app/store/database/oci_image_index_mapping.go
Normal file
179
registry/app/store/database/oci_image_index_mapping.go
Normal file
@ -0,0 +1,179 @@
|
||||
// Copyright 2023 Harness, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/harness/gitness/app/api/request"
|
||||
"github.com/harness/gitness/registry/app/store"
|
||||
"github.com/harness/gitness/registry/app/store/database/util"
|
||||
"github.com/harness/gitness/registry/types"
|
||||
store2 "github.com/harness/gitness/store"
|
||||
databaseg "github.com/harness/gitness/store/database"
|
||||
"github.com/harness/gitness/store/database/dbtx"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type ociImageIndexMappingDao struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewOCIImageIndexMappingDao(db *sqlx.DB) store.OCIImageIndexMappingRepository {
|
||||
return &ociImageIndexMappingDao{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
type ociImageIndexMappingDB struct {
|
||||
ID int64 `db:"oci_mapping_id"`
|
||||
ParentManifestID int64 `db:"oci_mapping_parent_manifest_id"`
|
||||
ChildDigest []byte `db:"oci_mapping_child_digest"`
|
||||
CreatedAt int64 `db:"oci_mapping_created_at"`
|
||||
UpdatedAt int64 `db:"oci_mapping_updated_at"`
|
||||
CreatedBy int64 `db:"oci_mapping_created_by"`
|
||||
UpdatedBy int64 `db:"oci_mapping_updated_by"`
|
||||
}
|
||||
|
||||
func (dao *ociImageIndexMappingDao) Create(
|
||||
ctx context.Context,
|
||||
ociManifest *types.OCIImageIndexMapping,
|
||||
) error {
|
||||
const sqlQuery = `
|
||||
INSERT INTO oci_image_index_mappings (
|
||||
oci_mapping_parent_manifest_id,
|
||||
oci_mapping_child_digest,
|
||||
oci_mapping_created_at,
|
||||
oci_mapping_updated_at,
|
||||
oci_mapping_created_by,
|
||||
oci_mapping_updated_by
|
||||
) VALUES (
|
||||
:oci_mapping_parent_manifest_id,
|
||||
:oci_mapping_child_digest,
|
||||
:oci_mapping_created_at,
|
||||
:oci_mapping_updated_at,
|
||||
:oci_mapping_created_by,
|
||||
:oci_mapping_updated_by
|
||||
) ON CONFLICT (oci_mapping_parent_manifest_id, oci_mapping_child_digest)
|
||||
DO NOTHING
|
||||
RETURNING oci_mapping_id`
|
||||
|
||||
db := dbtx.GetAccessor(ctx, dao.db)
|
||||
internalManifest := mapToInternalOCIMapping(ctx, ociManifest)
|
||||
query, args, err := db.BindNamed(sqlQuery, internalManifest)
|
||||
if err != nil {
|
||||
return databaseg.ProcessSQLErrorf(ctx, err, "Bind query failed")
|
||||
}
|
||||
|
||||
if err = db.QueryRowContext(ctx, query, args...).Scan(&ociManifest.ID); err != nil {
|
||||
err = databaseg.ProcessSQLErrorf(ctx, err, "QueryRowContext failed")
|
||||
if errors.Is(err, store2.ErrDuplicate) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("inserting OCI image index mapping: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dao *ociImageIndexMappingDao) GetAllByChildDigest(
|
||||
ctx context.Context, registryID int64, imageName string, childDigest types.Digest,
|
||||
) ([]*types.OCIImageIndexMapping, error) {
|
||||
digestBytes, err := util.GetHexDecodedBytes(string(childDigest))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get digest bytes: %w", err)
|
||||
}
|
||||
const sqlQuery = `
|
||||
SELECT
|
||||
oci_mapping_id,
|
||||
oci_mapping_parent_manifest_id,
|
||||
oci_mapping_child_digest,
|
||||
oci_mapping_created_at,
|
||||
oci_mapping_updated_at,
|
||||
oci_mapping_created_by,
|
||||
oci_mapping_updated_by
|
||||
FROM
|
||||
oci_image_index_mappings
|
||||
JOIN manifests ON manifests.manifest_id = oci_image_index_mappings.oci_mapping_parent_manifest_id
|
||||
WHERE
|
||||
manifest_registry_id = $1 AND
|
||||
manifest_image_name = $2 AND
|
||||
oci_mapping_child_digest = $3`
|
||||
|
||||
db := dbtx.GetAccessor(ctx, dao.db)
|
||||
rows, err := db.QueryxContext(ctx, sqlQuery, registryID, imageName, digestBytes)
|
||||
if err != nil || rows.Err() != nil {
|
||||
return nil, databaseg.ProcessSQLErrorf(ctx, err, "QueryxContext failed")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var manifests []*types.OCIImageIndexMapping
|
||||
for rows.Next() {
|
||||
var dbManifest ociImageIndexMappingDB
|
||||
if err := rows.StructScan(&dbManifest); err != nil {
|
||||
return nil, databaseg.ProcessSQLErrorf(ctx, err, "StructScan failed")
|
||||
}
|
||||
manifests = append(manifests, mapToExternalOCIManifest(&dbManifest))
|
||||
}
|
||||
return manifests, nil
|
||||
}
|
||||
|
||||
func mapToInternalOCIMapping(ctx context.Context, in *types.OCIImageIndexMapping) *ociImageIndexMappingDB {
|
||||
if in.CreatedAt.IsZero() {
|
||||
in.CreatedAt = time.Now()
|
||||
}
|
||||
in.UpdatedAt = time.Now()
|
||||
session, _ := request.AuthSessionFrom(ctx)
|
||||
if in.CreatedBy == 0 {
|
||||
in.CreatedBy = session.Principal.ID
|
||||
}
|
||||
in.UpdatedBy = session.Principal.ID
|
||||
childBytes, err := types.GetDigestBytes(in.ChildManifestDigest)
|
||||
if err != nil {
|
||||
log.Error().Msgf("failed to get digest bytes: %v", err)
|
||||
}
|
||||
|
||||
return &ociImageIndexMappingDB{
|
||||
ID: in.ID,
|
||||
ParentManifestID: in.ParentManifestID,
|
||||
ChildDigest: childBytes,
|
||||
CreatedAt: in.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: in.UpdatedAt.UnixMilli(),
|
||||
CreatedBy: in.CreatedBy,
|
||||
UpdatedBy: in.UpdatedBy,
|
||||
}
|
||||
}
|
||||
|
||||
func mapToExternalOCIManifest(in *ociImageIndexMappingDB) *types.OCIImageIndexMapping {
|
||||
childDgst := types.Digest(util.GetHexEncodedString(in.ChildDigest))
|
||||
parsedChildDigest, err := childDgst.Parse()
|
||||
if err != nil {
|
||||
log.Error().Msgf("failed to child parse digest: %v", err)
|
||||
}
|
||||
|
||||
return &types.OCIImageIndexMapping{
|
||||
ID: in.ID,
|
||||
ParentManifestID: in.ParentManifestID,
|
||||
ChildManifestDigest: parsedChildDigest,
|
||||
CreatedAt: time.UnixMilli(in.CreatedAt),
|
||||
UpdatedAt: time.UnixMilli(in.UpdatedAt),
|
||||
CreatedBy: in.CreatedBy,
|
||||
UpdatedBy: in.UpdatedBy,
|
||||
}
|
||||
}
|
@ -75,6 +75,10 @@ func ProvideManifestRefDao(db *sqlx.DB) store.ManifestReferenceRepository {
|
||||
return NewManifestReferenceDao(db)
|
||||
}
|
||||
|
||||
func ProvideOCIImageIndexMappingDao(db *sqlx.DB) store.OCIImageIndexMappingRepository {
|
||||
return NewOCIImageIndexMappingDao(db)
|
||||
}
|
||||
|
||||
func ProvideLayerDao(db *sqlx.DB, mtRepository store.MediaTypesRepository) store.LayerRepository {
|
||||
return NewLayersDao(db, mtRepository)
|
||||
}
|
||||
@ -93,6 +97,7 @@ var WireSet = wire.NewSet(
|
||||
ProvideManifestDao,
|
||||
ProvideCleanupPolicyDao,
|
||||
ProvideManifestRefDao,
|
||||
ProvideOCIImageIndexMappingDao,
|
||||
ProvideLayerDao,
|
||||
ProvideImageDao,
|
||||
ProvideArtifactDao,
|
||||
|
31
registry/types/oci_Image_index_mapping.go
Normal file
31
registry/types/oci_Image_index_mapping.go
Normal file
@ -0,0 +1,31 @@
|
||||
// Copyright 2023 Harness, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
type OCIImageIndexMapping struct {
|
||||
ID int64
|
||||
ParentManifestID int64
|
||||
ChildManifestDigest digest.Digest
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
CreatedBy int64
|
||||
UpdatedBy int64
|
||||
}
|
Loading…
Reference in New Issue
Block a user