drone/gitrpc/internal/service/upload.go
Johannes Batzill 3b120dd2b3 Add Zerolog Support to GITRPC (#126)
This change adds the following:
- Inject APP server zerolog RequestID as metadata into all gitrpc calls from client side.
- Inject Zerolog logger into context with common fields set (like service, method, requestID, ...). This allows for better request tracking within gitrpc, but also request tracking across services we extract the requestID send by the grpc client
- Modify http logs to use http. prefix for common http related logging annotations.
2022-12-16 08:39:10 -08:00

180 lines
4.4 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package service
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"path/filepath"
"time"
"github.com/harness/gitness/gitrpc/internal/types"
"github.com/harness/gitness/gitrpc/rpc"
"github.com/rs/zerolog/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// TODO: this should be taken as a struct input defined in proto.
func (s RepositoryService) AddFilesAndPush(
ctx context.Context,
repoPath string,
filePaths []string,
branch string,
author *rpc.Identity,
committer *rpc.Identity,
remote string,
message string,
) error {
if author == nil || committer == nil {
return status.Errorf(codes.InvalidArgument, "both author and committer have to be provided")
}
err := s.adapter.AddFiles(repoPath, false, filePaths...)
if err != nil {
return processGitErrorf(err, "failed to add files")
}
now := time.Now()
err = s.adapter.Commit(repoPath, types.CommitChangesOptions{
// TODO: Add gitness signature
Committer: types.Signature{
Identity: types.Identity{
Name: committer.Name,
Email: committer.Email,
},
When: now,
},
// TODO: Add gitness signature
Author: types.Signature{
Identity: types.Identity{
Name: author.Name,
Email: author.Email,
},
When: now,
},
Message: message,
})
if err != nil {
return processGitErrorf(err, "failed to commit files")
}
err = s.adapter.Push(ctx, repoPath, types.PushOptions{
// TODO: Don't hard-code
Remote: remote,
Branch: branch,
Force: false,
Mirror: false,
Env: nil,
Timeout: 0,
})
if err != nil {
return processGitErrorf(err, "failed to push files")
}
return nil
}
func (s RepositoryService) handleFileUploadIfAvailable(ctx context.Context, basePath string,
nextFSElement func() (*rpc.FileUpload, error)) (string, error) {
log := log.Ctx(ctx)
log.Info().Msg("waiting to receive file upload header")
header, err := getFileStreamHeader(nextFSElement)
if err != nil {
return "", err
}
log.Info().Msgf("storing file at %s", header.Path)
// work with file content chunks
fileData := bytes.Buffer{}
fileSize := 0
for {
log.Debug().Msg("waiting to receive data")
var chunk *rpc.Chunk
chunk, err = getFileUploadChunk(nextFSElement)
if errors.Is(err, io.EOF) {
// only for a header we expect a stream EOF error (for chunk its a chunk.EOF).
return "", fmt.Errorf("data stream ended unexpectedly")
}
if err != nil {
return "", err
}
size := len(chunk.Data)
if size > 0 {
log.Debug().Msgf("received a chunk with size: %d", size)
// TODO: file size could be checked on client side?
fileSize += size
if fileSize > maxFileSize {
return "", status.Errorf(codes.InvalidArgument, "file is too large: %d > %d", fileSize, maxFileSize)
}
// TODO: write in file as we go (instead of in buffer)
_, err = fileData.Write(chunk.Data)
if err != nil {
return "", status.Errorf(codes.Internal, "cannot write chunk data: %v", err)
}
}
if chunk.Eof {
log.Info().Msg("Received file EOF")
break
}
}
fullPath := filepath.Join(basePath, header.Path)
log.Info().Msgf("saving file at path %s", fullPath)
_, err = s.store.Save(fullPath, fileData)
if err != nil {
return "", status.Errorf(codes.Internal, "cannot save file to the store: %v", err)
}
return fullPath, nil
}
func getFileStreamHeader(nextFileUpload func() (*rpc.FileUpload, error)) (*rpc.FileUploadHeader, error) {
fs, err := getFileUpload(nextFileUpload)
if err != nil {
return nil, err
}
header := fs.GetHeader()
if header == nil {
return nil, status.Errorf(codes.Internal, "file stream is in wrong order - expected header")
}
return header, nil
}
func getFileUploadChunk(nextFileUpload func() (*rpc.FileUpload, error)) (*rpc.Chunk, error) {
fs, err := getFileUpload(nextFileUpload)
if err != nil {
return nil, err
}
chunk := fs.GetChunk()
if chunk == nil {
return nil, status.Errorf(codes.Internal, "file stream is in wrong order - expected chunk")
}
return chunk, nil
}
func getFileUpload(nextFileUpload func() (*rpc.FileUpload, error)) (*rpc.FileUpload, error) {
fs, err := nextFileUpload()
if err != nil {
return nil, err
}
if fs == nil {
return nil, status.Errorf(codes.Internal, "file stream wasn't found")
}
return fs, nil
}