drone/gitrpc/internal/service/operations.go
Johannes Batzill 3b120dd2b3 Add Zerolog Support to GITRPC (#126)
This change adds the following:
- Inject APP server zerolog RequestID as metadata into all gitrpc calls from client side.
- Inject Zerolog logger into context with common fields set (like service, method, requestID, ...). This allows for better request tracking within gitrpc, but also request tracking across services we extract the requestID send by the grpc client
- Modify http logs to use http. prefix for common http related logging annotations.
2022-12-16 08:39:10 -08:00

464 lines
12 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package service
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"path"
"strings"
"github.com/harness/gitness/gitrpc/internal/files"
"github.com/harness/gitness/gitrpc/internal/slices"
"github.com/harness/gitness/gitrpc/internal/types"
"github.com/harness/gitness/gitrpc/rpc"
"code.gitea.io/gitea/modules/git"
)
const (
filePrefix = "file://"
)
type CommitFilesService struct {
rpc.UnimplementedCommitFilesServiceServer
adapter GitAdapter
reposRoot string
reposTempDir string
}
type fileAction struct {
header *rpc.CommitFilesActionHeader
// content can hold file content or new path for move operation
// new path is prefixed with filePrefix constant
content []byte
}
func NewCommitFilesService(adapter GitAdapter, reposRoot, reposTempDir string) (*CommitFilesService, error) {
return &CommitFilesService{
adapter: adapter,
reposRoot: reposRoot,
reposTempDir: reposTempDir,
}, nil
}
func (s *CommitFilesService) CommitFiles(stream rpc.CommitFilesService_CommitFilesServer) error {
ctx := stream.Context()
headerRequest, err := stream.Recv()
if err != nil {
return err
}
header := headerRequest.GetHeader()
if header == nil {
return types.ErrHeaderCannotBeEmpty
}
author, committer := GetAuthorAndCommitter(header.Author, header.Committer)
repoPath := getFullPathForRepo(s.reposRoot, header.GetRepoUid())
repo, err := git.OpenRepository(ctx, repoPath)
if err != nil {
return err
}
if err = s.validateHeader(repo, header); err != nil {
return err
}
actions := make([]fileAction, 0, 16)
if err = s.collectActions(stream, &actions); err != nil {
return err
}
// create a shared repo
shared, err := NewSharedRepo(s.reposTempDir, header.GetRepoUid(), repo)
if err != nil {
return err
}
defer shared.Close(ctx)
if err = s.clone(ctx, repo, shared, header.GetBranchName()); err != nil {
return err
}
// Get the commit of the original branch
commit, err := shared.GetBranchCommit(header.GetBranchName())
if err != nil {
return err
}
for _, action := range actions {
action := action
if err = s.processAction(ctx, shared, &action, commit); err != nil {
return err
}
}
// Now write the tree
treeHash, err := shared.WriteTree(ctx)
if err != nil {
return err
}
message := strings.TrimSpace(header.GetTitle())
if len(header.GetMessage()) > 0 {
message += "\n\n" + header.GetMessage()
}
// Now commit the tree
commitHash, err := shared.CommitTree(ctx, commit.ID.String(), author, committer, treeHash, message, false)
if err != nil {
return err
}
if err = shared.Push(ctx, author, commitHash, header.GetNewBranchName()); err != nil {
return err
}
commit, err = shared.GetCommit(commitHash)
if err != nil {
return err
}
return stream.SendAndClose(&rpc.CommitFilesResponse{
CommitId: commit.ID.String(),
})
}
func (s *CommitFilesService) validateHeader(repo *git.Repository, header *rpc.CommitFilesRequestHeader) error {
if header.GetBranchName() == "" {
branch, err := repo.GetDefaultBranch()
if err != nil {
return err
}
header.BranchName = branch
}
if header.GetNewBranchName() == "" {
header.NewBranchName = header.GetBranchName()
}
if _, err := repo.GetBranch(header.GetBranchName()); err != nil {
return err
}
if header.GetBranchName() != header.GetNewBranchName() {
existingBranch, err := repo.GetBranch(header.GetNewBranchName())
if existingBranch != nil {
return fmt.Errorf("branch %s %w", existingBranch.Name, types.ErrAlreadyExists)
}
if err != nil && !git.IsErrBranchNotExist(err) {
return err
}
}
return nil
}
func (s *CommitFilesService) clone(
ctx context.Context,
repo *git.Repository,
shared *SharedRepo,
branch string,
) error {
if err := shared.Clone(ctx, branch); err != nil {
empty, _ := repo.IsEmpty()
if !git.IsErrBranchNotExist(err) || !empty {
return err
}
if errInit := shared.Init(ctx); errInit != nil {
return errInit
}
return err
}
return shared.SetDefaultIndex(ctx)
}
func (s *CommitFilesService) collectActions(
stream rpc.CommitFilesService_CommitFilesServer,
ptrActions *[]fileAction,
) error {
if ptrActions == nil {
return nil
}
actions := *ptrActions
for {
req, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("receive request: %w", err)
}
switch payload := req.GetAction().GetPayload().(type) {
case *rpc.CommitFilesAction_Header:
actions = append(actions, fileAction{header: payload.Header})
case *rpc.CommitFilesAction_Content:
if len(actions) == 0 {
return types.ErrContentSentBeforeAction
}
// append the content to the previous fileAction
content := &actions[len(actions)-1].content
*content = append(*content, payload.Content...)
default:
return fmt.Errorf("unhandled fileAction payload type: %T", payload)
}
}
if len(actions) == 0 {
return types.ErrActionListEmpty
}
*ptrActions = actions
return nil
}
func (s *CommitFilesService) processAction(
ctx context.Context,
shared *SharedRepo,
action *fileAction,
commit *git.Commit,
) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("in processActions: %w", err)
}
}()
header := action.header
if _, ok := rpc.CommitFilesActionHeader_ActionType_name[int32(header.Action)]; !ok {
return fmt.Errorf("%s %w", action.header.Action, types.ErrUndefinedAction)
}
filePath := files.CleanUploadFileName(header.GetPath())
if filePath == "" {
return types.ErrInvalidPath
}
mode := "100644" // 0o644 default file permission
reader := bytes.NewReader(action.content)
switch header.Action {
case rpc.CommitFilesActionHeader_CREATE:
err = createFile(ctx, shared, commit, filePath, mode, reader)
case rpc.CommitFilesActionHeader_UPDATE:
err = updateFile(ctx, shared, commit, filePath, header.GetSha(), mode, reader)
case rpc.CommitFilesActionHeader_MOVE:
err = moveFile(ctx, shared, commit, filePath, mode, reader)
case rpc.CommitFilesActionHeader_DELETE:
err = deleteFile(ctx, shared, filePath)
}
return err
}
func createFile(ctx context.Context, repo *SharedRepo, commit *git.Commit, filePath,
mode string, reader io.Reader) error {
if err := checkPath(commit, filePath, true); err != nil {
return err
}
filesInIndex, err := repo.LsFiles(ctx, filePath)
if err != nil {
return fmt.Errorf("listing files error %w", err)
}
if slices.Contains(filesInIndex, filePath) {
return fmt.Errorf("%s %w", filePath, types.ErrAlreadyExists)
}
hash, err := repo.HashObject(ctx, reader)
if err != nil {
return fmt.Errorf("error hashing object %w", err)
}
// Add the object to the index
if err = repo.AddObjectToIndex(ctx, mode, hash, filePath); err != nil {
return fmt.Errorf("error creating object: %w", err)
}
return nil
}
func updateFile(ctx context.Context, repo *SharedRepo, commit *git.Commit, filePath, sha,
mode string, reader io.Reader) error {
filesInIndex, err := repo.LsFiles(ctx, filePath)
if err != nil {
return fmt.Errorf("listing files error %w", err)
}
if !slices.Contains(filesInIndex, filePath) {
return fmt.Errorf("%s %w", filePath, types.ErrNotFound)
}
if commit != nil {
var entry *git.TreeEntry
entry, err = getFileEntry(commit, sha, filePath)
if err != nil {
return err
}
if entry.IsExecutable() {
mode = "100755"
}
}
hash, err := repo.HashObject(ctx, reader)
if err != nil {
return fmt.Errorf("error hashing object %w", err)
}
if err = repo.AddObjectToIndex(ctx, mode, hash, filePath); err != nil {
return fmt.Errorf("error updating object: %w", err)
}
return nil
}
func moveFile(ctx context.Context, repo *SharedRepo, commit *git.Commit,
filePath, mode string, reader io.Reader) error {
buffer := &bytes.Buffer{}
newPath, err := parsePayload(reader, buffer)
if err != nil {
return err
}
if buffer.Len() == 0 && newPath != "" {
err = repo.ShowFile(ctx, filePath, commit.ID.String(), buffer)
if err != nil {
return err
}
}
if err = checkPath(commit, newPath, false); err != nil {
return err
}
filesInIndex, err := repo.LsFiles(ctx, filePath)
if err != nil {
return fmt.Errorf("listing files error %w", err)
}
if !slices.Contains(filesInIndex, filePath) {
return fmt.Errorf("%s %w", filePath, types.ErrNotFound)
}
if slices.Contains(filesInIndex, newPath) {
return fmt.Errorf("%s %w", filePath, types.ErrAlreadyExists)
}
hash, err := repo.HashObject(ctx, buffer)
if err != nil {
return fmt.Errorf("error hashing object %w", err)
}
if err = repo.AddObjectToIndex(ctx, mode, hash, newPath); err != nil {
return fmt.Errorf("created object: %w", err)
}
if err = repo.RemoveFilesFromIndex(ctx, filePath); err != nil {
return fmt.Errorf("remove object: %w", err)
}
return nil
}
func deleteFile(ctx context.Context, repo *SharedRepo, filePath string) error {
filesInIndex, err := repo.LsFiles(ctx, filePath)
if err != nil {
return fmt.Errorf("listing files error %w", err)
}
if !slices.Contains(filesInIndex, filePath) {
return fmt.Errorf("%s %w", filePath, types.ErrNotFound)
}
if err = repo.RemoveFilesFromIndex(ctx, filePath); err != nil {
return fmt.Errorf("remove object: %w", err)
}
return nil
}
func getFileEntry(
commit *git.Commit,
sha string,
path string,
) (*git.TreeEntry, error) {
entry, err := commit.GetTreeEntryByPath(path)
if err != nil {
return nil, err
}
// If a SHA was given and the SHA given doesn't match the SHA of the fromTreePath, throw error
if sha == "" || sha != entry.ID.String() {
return nil, fmt.Errorf("%w for path %s [given: %s, expected: %s]",
types.ErrSHADoesNotMatch, path, sha, entry.ID.String())
}
return entry, nil
}
func checkPath(commit *git.Commit, filePath string, isNewFile bool) error {
// For the path where this file will be created/updated, we need to make
// sure no parts of the path are existing files or links except for the last
// item in the path which is the file name, and that shouldn't exist IF it is
// a new file OR is being moved to a new path.
parts := strings.Split(filePath, "/")
subTreePath := ""
for index, part := range parts {
subTreePath = path.Join(subTreePath, part)
entry, err := commit.GetTreeEntryByPath(subTreePath)
if err != nil {
if git.IsErrNotExist(err) {
// Means there is no item with that name, so we're good
break
}
return err
}
switch {
case index < len(parts)-1:
if !entry.IsDir() {
return fmt.Errorf("a file %w where you're trying to create a subdirectory [path: %s]",
types.ErrAlreadyExists, subTreePath)
}
case entry.IsLink():
return fmt.Errorf("a symbolic link %w where you're trying to create a subdirectory [path: %s]",
types.ErrAlreadyExists, subTreePath)
case entry.IsDir():
return fmt.Errorf("a directory %w where you're trying to create a subdirectory [path: %s]",
types.ErrAlreadyExists, subTreePath)
case filePath != "" || isNewFile:
return fmt.Errorf("%s %w", filePath, types.ErrAlreadyExists)
}
}
return nil
}
func parsePayload(payload io.Reader, content io.Writer) (string, error) {
newPath := ""
reader := bufio.NewReader(payload)
// check for filePrefix
prefixBytes := make([]byte, len(filePrefix))
if _, err := reader.Read(prefixBytes); err != nil {
if errors.Is(err, io.EOF) {
return "", nil
}
return "", err
}
// check if payload starts with filePrefix constant
if bytes.Equal(prefixBytes, []byte(filePrefix)) {
filename, _ := reader.ReadString('\n') // no err handling because next statement will check filename
newPath = files.CleanUploadFileName(filename)
if newPath == "" {
return "", types.ErrInvalidPath
}
} else {
if _, err := content.Write(prefixBytes); err != nil {
return "", err
}
}
_, err := io.Copy(content, reader)
return newPath, err
}
// GetAuthorAndCommitter Gets the author and committer user objects from the Identity.
func GetAuthorAndCommitter(author, committer *rpc.Identity) (authorUser, committerUser *rpc.Identity) {
authorUser = author
committerUser = committer
if author == nil && committer == nil {
authorUser = SystemIdentity
committer = SystemIdentity
}
if author == nil && committer != nil {
authorUser = committer
}
if committer == nil && author != nil {
committerUser = author
}
return authorUser, committerUser
}