diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 49d2c151b..d7ab88a20 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -480,7 +480,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro registryBlobRepository := database2.ProvideRegistryBlobDao(db) bandwidthStatRepository := database2.ProvideBandwidthStatDao(db) downloadStatRepository := database2.ProvideDownloadStatDao(db) - localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor) + localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor, eventReporter) upstreamProxyConfigRepository := database2.ProvideUpstreamDao(db, registryRepository, spaceFinder) proxyController := docker.ProvideProxyController(localRegistry, manifestService, secretService, spaceFinder) remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spaceFinder, secretService, proxyController) @@ -492,7 +492,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro filemanagerApp := filemanager.NewApp(ctx, config, storageService) genericBlobRepository := database2.ProvideGenericBlobDao(db) nodesRepository := database2.ProvideNodeDao(db) - fileManager := filemanager.Provider(filemanagerApp, registryRepository, genericBlobRepository, nodesRepository, transactor) + fileManager := filemanager.Provider(filemanagerApp, registryRepository, genericBlobRepository, nodesRepository, transactor, eventReporter) cleanupPolicyRepository := database2.ProvideCleanupPolicyDao(db, transactor) webhooksRepository := database2.ProvideWebhookDao(db) webhooksExecutionRepository := database2.ProvideWebhookExecutionDao(db) diff --git a/registry/app/event/reporter.go b/registry/app/event/reporter.go index b66a87c66..8fb5c8986 100644 --- a/registry/app/event/reporter.go +++ b/registry/app/event/reporter.go @@ -22,6 +22,8 @@ import ( ) type PackageType int32 +type BlobAction int32 +type Provider int32 type ArtifactDetails struct { RegistryID int64 `json:"registry_id,omitempty"` @@ -30,6 +32,19 @@ type ArtifactDetails struct { PackageType PackageType `json:"package_type,omitempty"` } +// ReplicationDetails represents the ReplicationDetails message from the proto file. +type ReplicationDetails struct { + AccountID string `json:"account_id,omitempty"` + Action BlobAction `json:"action,omitempty"` + BlobID int64 `json:"blob_id,omitempty"` + GenericBlobID string `json:"generic_blob_id,omitempty"` + Path string `json:"path,omitempty"` + Provider Provider `json:"provider,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Region string `json:"region,omitempty"` + Bucket string `json:"bucket,omitempty"` +} + // PackageType constants using iota. const ( PackageTypeDOCKER = iota @@ -38,6 +53,16 @@ const ( PackageTypeMAVEN ) +const ( + BlobCreate BlobAction = 0 + BlobDelete BlobAction = 1 +) + +const ( + CLOUDFLARE Provider = 0 + GCS Provider = 1 +) + var PackageTypeValue = map[string]PackageType{ string(artifact.PackageTypeDOCKER): PackageTypeDOCKER, string(artifact.PackageTypeGENERIC): PackageTypeGENERIC, @@ -45,6 +70,16 @@ var PackageTypeValue = map[string]PackageType{ string(artifact.PackageTypeMAVEN): PackageTypeMAVEN, } +var BlobActionValue = map[string]BlobAction{ + "BlobCreate": BlobCreate, + "BlobDelete": BlobDelete, +} + +var ProviderValue = map[string]Provider{ + "CLOUDFLARE": CLOUDFLARE, + "GCS": GCS, +} + // GetPackageTypeFromString returns the PackageType constant corresponding to the given string value. func GetPackageTypeFromString(value string) (PackageType, error) { if val, ok := PackageTypeValue[value]; ok { diff --git a/registry/app/event/utils.go b/registry/app/event/utils.go new file mode 100644 index 000000000..b197ef417 --- /dev/null +++ b/registry/app/event/utils.go @@ -0,0 +1,71 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import ( + "context" + "errors" + "strings" + + a "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" + s "github.com/harness/gitness/registry/app/storage" + "github.com/harness/gitness/types" + + "github.com/rs/zerolog/log" +) + +func ReportEventAsync( + ctx context.Context, + accountID string, + reporter Reporter, + action BlobAction, + blobID int64, + genericBlobID string, + sha256 string, + conf *types.Config, +) { + var path string + var err error + switch { + case blobID != 0: + path, err = s.BlobPath(accountID, string(a.PackageTypeDOCKER), sha256) + case genericBlobID != "": + path, err = s.BlobPath(accountID, string(a.PackageTypeGENERIC), sha256) + default: + err = errors.New("blobID or genericBlobID must be set") + } + + if err != nil { + log.Error(). + Err(err). + Int64("blobID", blobID). + Str("genericBlobID", genericBlobID). + Str("action", string(action)). + Msg("Failed to determine blob path for event reporting") + return + } + + go reporter.ReportEvent(ctx, &ReplicationDetails{ + AccountID: accountID, + Action: action, + BlobID: blobID, + GenericBlobID: genericBlobID, + Path: path, + Provider: ProviderValue[strings.ToUpper(conf.Registry.Storage.S3Storage.Provider)], + Endpoint: conf.Registry.Storage.S3Storage.RegionEndpoint, + Region: conf.Registry.Storage.S3Storage.Region, + Bucket: conf.Registry.Storage.S3Storage.Bucket, + }, "") +} diff --git a/registry/app/pkg/docker/local.go b/registry/app/pkg/docker/local.go index 1e87b2c68..e3427877f 100644 --- a/registry/app/pkg/docker/local.go +++ b/registry/app/pkg/docker/local.go @@ -38,6 +38,7 @@ import ( "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" "github.com/harness/gitness/registry/app/dist_temp/dcontext" "github.com/harness/gitness/registry/app/dist_temp/errcode" + "github.com/harness/gitness/registry/app/event" "github.com/harness/gitness/registry/app/manifest" "github.com/harness/gitness/registry/app/manifest/manifestlist" "github.com/harness/gitness/registry/app/manifest/ocischema" @@ -112,7 +113,7 @@ func NewLocalRegistry( blobRepo store.BlobRepository, mtRepository store.MediaTypesRepository, tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository, bandwidthStatDao store.BandwidthStatRepository, downloadStatDao store.DownloadStatRepository, - gcService gc.Service, tx dbtx.Transactor, + gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter, ) Registry { return &LocalRegistry{ App: app, @@ -129,6 +130,7 @@ func NewLocalRegistry( downloadStatDao: downloadStatDao, gcService: gcService, tx: tx, + reporter: reporter, } } @@ -147,6 +149,7 @@ type LocalRegistry struct { downloadStatDao store.DownloadStatRepository gcService gc.Service tx dbtx.Transactor + reporter event.Reporter } func (r *LocalRegistry) Base() error { @@ -1590,7 +1593,7 @@ func (r *LocalRegistry) dbBlobLinkExists( } func (r *LocalRegistry) dbPutBlobUploadComplete( - ctx context.Context, + ctx *Context, repoName string, mediaType string, digestVal string, @@ -1604,14 +1607,16 @@ func (r *LocalRegistry) dbPutBlobUploadComplete( Size: int64(size), } + var storedBlob *types.Blob + created := false err := r.tx.WithTx( - ctx, func(ctx context.Context) error { + ctx.Context, func(ctx context.Context) error { registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoName) if err != nil { return err } - storedBlob, err := r.blobRepo.CreateOrFind(ctx, blob) + storedBlob, created, err = r.blobRepo.CreateOrFind(ctx, blob) if err != nil && !errors.Is(err, store2.ErrResourceNotFound) { return err } @@ -1634,6 +1639,12 @@ func (r *LocalRegistry) dbPutBlobUploadComplete( return fmt.Errorf("committing database transaction: %w", err) } + // Emit blob create event + if created { + event.ReportEventAsync(ctx.Context, ctx.OciBlobStore.Path(), + r.reporter, event.BlobCreate, storedBlob.ID, + "", digestVal, r.App.Config) + } return nil } @@ -1674,6 +1685,7 @@ func (r *LocalRegistry) dbDeleteBlob( return storage.ErrBlobUnknown } + // No need to emit blob delete event here. The GC will take care of it even in the replicated regions. return nil } diff --git a/registry/app/pkg/docker/wire.go b/registry/app/pkg/docker/wire.go index 4ff1b128e..cbca50fe2 100644 --- a/registry/app/pkg/docker/wire.go +++ b/registry/app/pkg/docker/wire.go @@ -44,11 +44,12 @@ func LocalRegistryProvider( mtRepository store.MediaTypesRepository, tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository, bandwidthStatDao store.BandwidthStatRepository, downloadStatDao store.DownloadStatRepository, - gcService gc.Service, tx dbtx.Transactor, + gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter, ) *LocalRegistry { registry, ok := NewLocalRegistry( app, ms, manifestDao, registryDao, registryBlobDao, blobRepo, - mtRepository, tagDao, imageDao, artifactDao, bandwidthStatDao, downloadStatDao, gcService, tx, + mtRepository, tagDao, imageDao, artifactDao, bandwidthStatDao, downloadStatDao, + gcService, tx, reporter, ).(*LocalRegistry) if !ok { return nil diff --git a/registry/app/pkg/filemanager/file_manager.go b/registry/app/pkg/filemanager/file_manager.go index 508888a68..c513e615a 100644 --- a/registry/app/pkg/filemanager/file_manager.go +++ b/registry/app/pkg/filemanager/file_manager.go @@ -22,6 +22,7 @@ import ( "path" "strings" + "github.com/harness/gitness/registry/app/event" "github.com/harness/gitness/registry/app/storage" "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/registry/types" @@ -42,7 +43,8 @@ const ( func NewFileManager( app *App, registryDao store.RegistryRepository, genericBlobDao store.GenericBlobRepository, nodesDao store.NodesRepository, - tx dbtx.Transactor, + tx dbtx.Transactor, reporter event.Reporter, + ) FileManager { return FileManager{ App: app, @@ -50,6 +52,7 @@ func NewFileManager( genericBlobDao: genericBlobDao, nodesDao: nodesDao, tx: tx, + reporter: reporter, } } @@ -59,6 +62,7 @@ type FileManager struct { genericBlobDao store.GenericBlobRepository nodesDao store.NodesRepository tx dbtx.Transactor + reporter event.Reporter } func (f *FileManager) UploadFile( @@ -116,7 +120,8 @@ func (f *FileManager) UploadFile( MD5: fileInfo.MD5, Size: fileInfo.Size, } - err = f.genericBlobDao.Create(ctx, gb) + var created bool + created, err = f.genericBlobDao.Create(ctx, gb) if err != nil { log.Error().Msgf("failed to save generic blob in db with "+ "sha256 : %s, err: %s", fileInfo.Sha256, err.Error()) @@ -138,6 +143,12 @@ func (f *FileManager) UploadFile( return types.FileInfo{}, fmt.Errorf("failed to save nodes for"+ " file : %s, with path : %s, err: %w", filename, filePath, err) } + + // Emit blob create event + if created { + event.ReportEventAsync(ctx, rootIdentifier, f.reporter, event.BlobCreate, 0, blobID, fileInfo.Sha256, + f.App.Config) + } return fileInfo, nil } diff --git a/registry/app/pkg/filemanager/wire.go b/registry/app/pkg/filemanager/wire.go index 1c8cd3371..6ffd6f1d4 100644 --- a/registry/app/pkg/filemanager/wire.go +++ b/registry/app/pkg/filemanager/wire.go @@ -15,6 +15,7 @@ package filemanager import ( + "github.com/harness/gitness/registry/app/event" "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/store/database/dbtx" @@ -24,8 +25,9 @@ import ( func Provider(app *App, registryDao store.RegistryRepository, genericBlobDao store.GenericBlobRepository, nodesDao store.NodesRepository, tx dbtx.Transactor, + reporter event.Reporter, ) FileManager { - return NewFileManager(app, registryDao, genericBlobDao, nodesDao, tx) + return NewFileManager(app, registryDao, genericBlobDao, nodesDao, tx, reporter) } var AppSet = wire.NewSet(NewApp) diff --git a/registry/app/storage/paths.go b/registry/app/storage/paths.go index 18c523667..089b7e351 100644 --- a/registry/app/storage/paths.go +++ b/registry/app/storage/paths.go @@ -21,6 +21,8 @@ import ( "path" "strings" + a "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" + "github.com/opencontainers/go-digest" ) @@ -30,6 +32,11 @@ const ( blobs = "blobs" ) +// PackageType constants using iota. +const ( + PackageTypeDOCKER = iota +) + func pathFor(spec pathSpec) (string, error) { rootPrefix := []string{storagePathRoot} switch v := spec.(type) { @@ -182,3 +189,19 @@ func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error) return append(prefix, suffix...), nil } + +// BlobPath returns the path for a blob based on the package type. +func BlobPath(acctID string, packageType string, sha256 string) (string, error) { + acctID = strings.ToLower(acctID) + // sample = sha256:50f564aff30aeb53eb88b0eb2c2ba59878e9854681989faa5ff7396bdfaf509b + sha256 = strings.TrimPrefix(sha256, "sha256:") + sha256Prefix := sha256[:2] + + switch packageType { + case string(a.PackageTypeDOCKER): + // format: /accountId(lowercase)/docker/blobs/sha256/(2 character prefix of sha)/sha/data + return fmt.Sprintf("/%s/docker/blobs/sha256/%s/%s/data", acctID, sha256Prefix, sha256), nil + default: + return fmt.Sprintf("/%s/files/%s", acctID, sha256), nil + } +} diff --git a/registry/app/store/database.go b/registry/app/store/database.go index f56782368..055499e7f 100644 --- a/registry/app/store/database.go +++ b/registry/app/store/database.go @@ -41,7 +41,7 @@ type BlobRepository interface { ctx context.Context, d digest.Digest, repoID int64, imageName string, ) (*types.Blob, error) - CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, error) + CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, bool, error) DeleteByID(ctx context.Context, id int64) error ExistsBlob( ctx context.Context, repoID int64, d digest.Digest, @@ -574,7 +574,7 @@ type GenericBlobRepository interface { ctx context.Context, sha256 string, rootParentID int64, ) (*types.GenericBlob, error) - Create(ctx context.Context, gb *types.GenericBlob) error + Create(ctx context.Context, gb *types.GenericBlob) (bool, error) DeleteByID(ctx context.Context, id string) error TotalSizeByRootParentID(ctx context.Context, id int64) (int64, error) } diff --git a/registry/app/store/database/blob.go b/registry/app/store/database/blob.go index 706749c7d..cc89f858f 100644 --- a/registry/app/store/database/blob.go +++ b/registry/app/store/database/blob.go @@ -24,7 +24,6 @@ import ( "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/registry/app/store/database/util" "github.com/harness/gitness/registry/types" - store2 "github.com/harness/gitness/store" "github.com/harness/gitness/store/database" "github.com/harness/gitness/store/database/dbtx" @@ -173,16 +172,16 @@ func (bd blobDao) FindByDigestAndRepoID(ctx context.Context, d digest.Digest, re return bd.mapToBlob(dst) } -func (bd blobDao) CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, error) { +func (bd blobDao) CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, bool, error) { sqlQuery := `INSERT INTO blobs ( - blob_digest, - blob_root_parent_id, - blob_media_type_id, + blob_digest, + blob_root_parent_id, + blob_media_type_id, blob_size, blob_created_at, blob_created_by ) VALUES ( - :blob_digest, + :blob_digest, :blob_root_parent_id, :blob_media_type_id, :blob_size, @@ -190,33 +189,38 @@ func (bd blobDao) CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, :blob_created_by ) ON CONFLICT ( blob_digest, blob_root_parent_id - ) DO NOTHING + ) DO NOTHING RETURNING blob_id` mediaTypeID, err := bd.mtRepository.MapMediaType(ctx, b.MediaType) if err != nil { - return nil, err + return nil, false, err } b.MediaTypeID = mediaTypeID db := dbtx.GetAccessor(ctx, bd.db) blob, err := mapToInternalBlob(ctx, b) if err != nil { - return nil, err + return nil, false, err } query, arg, err := db.BindNamed(sqlQuery, blob) if err != nil { - return nil, database.ProcessSQLErrorf(ctx, err, "Failed to bind repo object") + return nil, false, database.ProcessSQLErrorf(ctx, err, "Failed to bind repo object") } + var created bool if err = db.QueryRowContext(ctx, query, arg...).Scan(&b.ID); err != nil { - err := database.ProcessSQLErrorf(ctx, err, "Insert query failed") - if !errors2.Is(err, store2.ErrResourceNotFound) { - return nil, err + if errors2.Is(err, sql.ErrNoRows) { + created = false + } else { + return nil, false, database.ProcessSQLErrorf(ctx, err, "Insert query failed") } + } else { + created = true } - return bd.FindByDigestAndRootParentID(ctx, b.Digest, b.RootParentID) + blob2, err := bd.FindByDigestAndRootParentID(ctx, b.Digest, b.RootParentID) + return blob2, created, err } func (bd blobDao) DeleteByID(ctx context.Context, id int64) error { diff --git a/registry/app/store/database/generic_blob.go b/registry/app/store/database/generic_blob.go index 875672076..eb9e65ab8 100644 --- a/registry/app/store/database/generic_blob.go +++ b/registry/app/store/database/generic_blob.go @@ -102,45 +102,45 @@ func (g GenericBlobDao) FindBySha256AndRootParentID(ctx context.Context, return g.mapToGenericBlob(ctx, dst) } -func (g GenericBlobDao) Create(ctx context.Context, gb *types.GenericBlob) error { +func (g GenericBlobDao) Create(ctx context.Context, gb *types.GenericBlob) (bool, error) { const sqlQuery = ` - INSERT INTO generic_blobs ( - generic_blob_id - ,generic_blob_root_parent_id - ,generic_blob_sha_1 - ,generic_blob_sha_256 - ,generic_blob_sha_512 - ,generic_blob_md5 - ,generic_blob_size - ,generic_blob_created_at - ,generic_blob_created_by - ) VALUES ( - :generic_blob_id - , :generic_blob_root_parent_id - ,:generic_blob_sha_1 - ,:generic_blob_sha_256 - ,:generic_blob_sha_512 - ,:generic_blob_md5 - ,:generic_blob_size - ,:generic_blob_created_at - ,:generic_blob_created_by - ) ON CONFLICT (generic_blob_root_parent_id, generic_blob_sha_256) - DO UPDATE SET generic_blob_id = generic_blobs.generic_blob_id - RETURNING generic_blob_id` + INSERT INTO generic_blobs ( + generic_blob_id, + generic_blob_root_parent_id, + generic_blob_sha_1, + generic_blob_sha_256, + generic_blob_sha_512, + generic_blob_md5, + generic_blob_size, + generic_blob_created_at, + generic_blob_created_by + ) VALUES ( + :generic_blob_id, + :generic_blob_root_parent_id, + :generic_blob_sha_1, + :generic_blob_sha_256, + :generic_blob_sha_512, + :generic_blob_md5, + :generic_blob_size, + :generic_blob_created_at, + :generic_blob_created_by + ) ON CONFLICT (generic_blob_root_parent_id, generic_blob_sha_256) + DO UPDATE SET generic_blob_id = generic_blobs.generic_blob_id + RETURNING generic_blob_id` db := dbtx.GetAccessor(ctx, g.sqlDB) query, arg, err := db.BindNamed(sqlQuery, g.mapToInternalGenericBlob(ctx, gb)) if err != nil { - return databaseg.ProcessSQLErrorf(ctx, err, "Failed to bind generic blob object") + return false, databaseg.ProcessSQLErrorf(ctx, err, "Failed to bind generic blob object") } if err = db.QueryRowContext(ctx, query, arg...).Scan(&gb.ID); err != nil { if errors.Is(err, sql.ErrNoRows) || errors.Is(err, store2.ErrDuplicate) { - return nil + return false, nil } - return databaseg.ProcessSQLErrorf(ctx, err, "Insert query failed") + return false, databaseg.ProcessSQLErrorf(ctx, err, "Insert query failed") } - return nil + return true, nil } func (g GenericBlobDao) DeleteByID(_ context.Context, _ string) error { diff --git a/types/config.go b/types/config.go index 54611d792..ab217bd3a 100644 --- a/types/config.go +++ b/types/config.go @@ -184,7 +184,7 @@ type Config struct { // BlobStore defines the blob storage configuration parameters. BlobStore struct { - // Provider is a name of blob storage service like filesystem or gcs + // Provider is a name of blob storage service like filesystem or gcs or cloudflare Provider blob.Provider `envconfig:"GITNESS_BLOBSTORE_PROVIDER" default:"filesystem"` // Bucket is a path to the directory where the files will be stored when using filesystem blob storage, // in case of gcs provider this will be the actual bucket where the images are stored. @@ -520,6 +520,7 @@ type Config struct { LogLevel string `envconfig:"GITNESS_REGISTRY_S3_LOG_LEVEL" default:"info"` Delete bool `envconfig:"GITNESS_REGISTRY_S3_DELETE_ENABLED" default:"true"` Redirect bool `envconfig:"GITNESS_REGISTRY_S3_STORAGE_REDIRECT" default:"false"` + Provider string `envconfig:"GITNESS_REGISTRY_S3_PROVIDER" default:"cloudflare"` } }