// Source: https://github.com/distribution/distribution // Copyright 2014 https://github.com/distribution/distribution Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "context" "errors" "fmt" "io" "path" "time" "github.com/harness/gitness/registry/app/dist_temp/dcontext" "github.com/harness/gitness/registry/app/driver" "github.com/harness/gitness/registry/app/manifest" "github.com/opencontainers/go-digest" "github.com/rs/zerolog/log" ) var errResumableDigestNotAvailable = errors.New("resumable digest not available") const ( // digestSha256Empty is the canonical sha256 digest of empty data. digestSha256Empty = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" ) // blobWriter is used to control the various aspects of resumable // blob upload. type blobWriter struct { ctx context.Context blobStore *ociBlobStore id string digester digest.Digester written int64 // track the write to digester fileWriter driver.FileWriter driver driver.StorageDriver path string resumableDigestEnabled bool committed bool } var _ BlobWriter = &blobWriter{} // ID returns the identifier for this upload. func (bw *blobWriter) ID() string { return bw.id } // Commit marks the upload as completed, returning a valid descriptor. The // final size and digest are checked against the first descriptor provided. func (bw *blobWriter) Commit(ctx context.Context, pathPrefix string, desc manifest.Descriptor) ( manifest.Descriptor, error, ) { dcontext.GetLogger(ctx, log.Debug()).Msg("(*blobWriter).Commit") if err := bw.fileWriter.Commit(ctx); err != nil { return manifest.Descriptor{}, err } bw.Close() desc.Size = bw.Size() canonical, err := bw.validateBlob(ctx, desc) if err != nil { return manifest.Descriptor{}, err } if err := bw.moveBlob(ctx, pathPrefix, canonical); err != nil { return manifest.Descriptor{}, err } if err := bw.removeResources(ctx); err != nil { return manifest.Descriptor{}, err } bw.committed = true return canonical, nil } // Cancel the blob upload process, releasing any resources associated with // the writer and canceling the operation. func (bw *blobWriter) Cancel(ctx context.Context) error { dcontext.GetLogger(ctx, log.Debug()).Msg("(*blobWriter).Cancel") if err := bw.fileWriter.Cancel(ctx); err != nil { return err } if err := bw.Close(); err != nil { dcontext.GetLogger(ctx, log.Error()).Msgf("error closing blobwriter: %s", err) } return bw.removeResources(ctx) } func (bw *blobWriter) Size() int64 { return bw.fileWriter.Size() } func (bw *blobWriter) Write(p []byte) (int, error) { // Ensure that the current write offset matches how many bytes have been // written to the digester. If not, we need to update the digest state to // match the current write position. if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && !errors.Is(err, errResumableDigestNotAvailable) { return 0, err } _, err := bw.fileWriter.Write(p) if err != nil { return 0, err } n, err := bw.digester.Hash().Write(p) bw.written += int64(n) return n, err } func (bw *blobWriter) Close() error { if bw.committed { return errors.New("blobwriter close after commit") } if err := bw.storeHashState(bw.blobStore.ctx); err != nil && !errors.Is(err, errResumableDigestNotAvailable) { return err } return bw.fileWriter.Close() } // validateBlob checks the data against the digest, returning an error if it // does not match. The canonical descriptor is returned. func (bw *blobWriter) validateBlob(ctx context.Context, desc manifest.Descriptor) (manifest.Descriptor, error) { var ( verified, fullHash bool canonical digest.Digest ) if desc.Digest == "" { // if no descriptors are provided, we have nothing to validate // against. We don't really want to support this for the registry. return manifest.Descriptor{}, BlobInvalidDigestError{ Reason: fmt.Errorf("cannot validate against empty digest"), } } var size int64 // Stat the on disk file if fi, err := bw.driver.Stat(ctx, bw.path); err != nil { if errors.As(err, &driver.PathNotFoundError{}) { desc.Size = 0 } else { // Any other error we want propagated up the stack. return manifest.Descriptor{}, err } } else { if fi.IsDir() { return manifest.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path) } size = fi.Size() } if desc.Size > 0 { if desc.Size != size { return manifest.Descriptor{}, ErrBlobInvalidLength } } else { // if provided 0 or negative length, we can assume caller doesn't know or // care about length. desc.Size = size } if err := bw.resumeDigest(ctx); err == nil { canonical = bw.digester.Digest() if canonical.Algorithm() == desc.Digest.Algorithm() { // Common case: client and server prefer the same canonical digest // algorithm - currently SHA256. verified = desc.Digest == canonical } else { // The client wants to use a different digest algorithm. They'll just // have to be patient and wait for us to download and re-hash the // uploaded content using that digest algorithm. fullHash = true } } else if errors.Is(err, errResumableDigestNotAvailable) { // Not using resumable digests, so we need to hash the entire layer. fullHash = true } else { return manifest.Descriptor{}, err } if fullHash && bw.written == size && digest.Canonical == desc.Digest.Algorithm() { // a fantastic optimization: if the the written data and the size are // the same, we don't need to read the data from the backend. This is // because we've written the entire file in the lifecycle of the // current instance. canonical = bw.digester.Digest() verified = desc.Digest == canonical } if fullHash && !verified { // If the check based on size fails, we fall back to the slowest of // paths. We may be able to make the size-based check a stronger // guarantee, so this may be defensive. digester := digest.Canonical.Digester() verifier := desc.Digest.Verifier() // Read the file from the backend driver and validate it. fr, err := NewFileReader(ctx, bw.driver, bw.path, desc.Size) if err != nil { return manifest.Descriptor{}, err } defer fr.Close() tr := io.TeeReader(fr, digester.Hash()) if _, err := io.Copy(verifier, tr); err != nil { return manifest.Descriptor{}, err } canonical = digester.Digest() verified = verifier.Verified() } if !verified { dcontext.GetLoggerWithFields( ctx, log.Ctx(ctx).Error(), map[interface{}]interface{}{ "canonical": canonical, "provided": desc.Digest, }, "canonical", "provided", ). Msg("canonical digest does match provided digest") return manifest.Descriptor{}, BlobInvalidDigestError{ Digest: desc.Digest, Reason: fmt.Errorf("content does not match digest"), } } // update desc with canonical hash desc.Digest = canonical if desc.MediaType == "" { desc.MediaType = "application/octet-stream" } return desc, nil } // moveBlob moves the data into its final, hash-qualified destination, // identified by dgst. The layer should be validated before commencing the // move. func (bw *blobWriter) moveBlob(ctx context.Context, pathPrefix string, desc manifest.Descriptor) error { blobPath, err := pathFor( blobDataPathSpec{ digest: desc.Digest, path: pathPrefix, }, ) if err != nil { return err } // Check for existence if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil { log.Ctx(ctx).Info().Msgf("Error type: %T, value: %v\n", err, err) if !errors.As(err, &driver.PathNotFoundError{}) { return err } } else { // If the path exists, we can assume that the content has already // been uploaded, since the blob storage is content-addressable. // While it may be corrupted, detection of such corruption belongs // elsewhere. return nil } // If no data was received, we may not actually have a file on disk. Check // the size here and write a zero-length file to blobPath if this is the // case. For the most part, this should only ever happen with zero-length // blobs. if _, err := bw.blobStore.driver.Stat(ctx, bw.path); err != nil { if errors.As(err, &driver.PathNotFoundError{}) { if desc.Digest == digestSha256Empty { return bw.blobStore.driver.PutContent(ctx, blobPath, []byte{}) } // We let this fail during the move below. log.Ctx(ctx).Warn(). Interface("upload.id", bw.ID()). Interface("digest", desc.Digest). Msg("attempted to move zero-length content with non-zero digest") } else { return err // unrelated error } } return bw.blobStore.driver.Move(ctx, bw.path, blobPath) } // removeResources should clean up all resources associated with the upload // instance. An error will be returned if the clean up cannot proceed. If the // resources are already not present, no error will be returned. func (bw *blobWriter) removeResources(ctx context.Context) error { dataPath, err := pathFor( uploadDataPathSpec{ path: bw.blobStore.rootParentRef, repoName: bw.blobStore.repoKey, id: bw.id, }, ) if err != nil { return err } // Resolve and delete the containing directory, which should include any // upload related files. dirPath := path.Dir(dataPath) if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil { if !errors.As(err, &driver.PathNotFoundError{}) { // This should be uncommon enough such that returning an error // should be okay. At this point, the upload should be mostly // complete, but perhaps the backend became unaccessible. dcontext.GetLogger(ctx, log.Error()).Msgf("unable to delete layer upload resources %q: %v", dirPath, err) return err } } return nil } func (bw *blobWriter) Reader() (io.ReadCloser, error) { try := 1 for try <= 5 { _, err := bw.driver.Stat(bw.ctx, bw.path) if err == nil { break } if errors.As(err, &driver.PathNotFoundError{}) { dcontext.GetLogger(bw.ctx, log.Debug()).Msgf("Nothing found on try %d, sleeping...", try) time.Sleep(1 * time.Second) try++ } else { return nil, err } } readCloser, err := bw.driver.Reader(bw.ctx, bw.path, 0) if err != nil { return nil, err } return readCloser, nil }