diff --git a/events/options.go b/events/options.go new file mode 100644 index 000000000..c45cbac2e --- /dev/null +++ b/events/options.go @@ -0,0 +1,58 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package events + +import ( + "time" + + "github.com/harness/gitness/stream" +) + +/* + * Expose event package options to simplify usage for consumers by hiding the stream package. + * Since we only forward the options, event options are simply aliases of stream options. + */ + +// ReaderOption can be used to configure event readers. +type ReaderOption stream.ConsumerOption + +func toStreamConsumerOptions(opts []ReaderOption) []stream.ConsumerOption { + streamOpts := make([]stream.ConsumerOption, len(opts)) + for i, opt := range opts { + streamOpts[i] = stream.ConsumerOption(opt) + } + return streamOpts +} + +// WithConcurrency sets up the concurrency of the reader. +func WithConcurrency(concurrency int) ReaderOption { + return stream.WithConcurrency(concurrency) +} + +// WithHandlerOptions sets up the default options for event handlers. +func WithHandlerOptions(opts ...HandlerOption) ReaderOption { + return stream.WithHandlerOptions(toStreamHandlerOptions(opts)...) +} + +// HandlerOption can be used to configure event handlers. +type HandlerOption stream.HandlerOption + +func toStreamHandlerOptions(opts []HandlerOption) []stream.HandlerOption { + streamOpts := make([]stream.HandlerOption, len(opts)) + for i, opt := range opts { + streamOpts[i] = stream.HandlerOption(opt) + } + return streamOpts +} + +// WithMaxRetries can be used to set the max retry count for a specific event handler. +func WithMaxRetries(maxRetries int) HandlerOption { + return stream.WithMaxRetries(maxRetries) +} + +// WithIdleTimeout can be used to set the idle timeout for a specific event handler. +func WithIdleTimeout(timeout time.Duration) HandlerOption { + return stream.WithIdleTimeout(timeout) +} diff --git a/events/reader.go b/events/reader.go index 8b19061ea..1d5ace254 100644 --- a/events/reader.go +++ b/events/reader.go @@ -10,7 +10,6 @@ import ( "encoding/gob" "errors" "fmt" - "time" "github.com/rs/zerolog/log" ) @@ -131,11 +130,11 @@ func (d *ReaderCanceler) Cancel() error { // Reader specifies the minimum functionality a reader should expose. // NOTE: we don't want to enforce any event registration methods here, allowing full control for customized readers. type Reader interface { - SetConcurrency(concurrency int) error - SetProcessingTimeout(timeout time.Duration) error - SetMaxRetryCount(retryCount int64) error + Configure(opts ...ReaderOption) } +type HandlerFunc[T interface{}] func(context.Context, *Event[T]) error + // GenericReader represents an event reader that supports registering type safe handlers // for an arbitrary set of custom events within a given event category using the ReaderRegisterEvent method. // NOTE: Optimally this should be an interface with RegisterEvent[T] method, but that's currently not possible in go. @@ -149,7 +148,7 @@ type GenericReader struct { // This method allows to register type safe handlers without the need of handling the raw stream payload. // NOTE: Generic arguments are not allowed for struct methods, hence pass the reader as input parameter. func ReaderRegisterEvent[T interface{}](reader *GenericReader, - eventType EventType, fn func(context.Context, *Event[T]) error) error { + eventType EventType, fn HandlerFunc[T], opts ...HandlerOption) error { streamID := getStreamID(reader.category, eventType) // register handler for event specific stream. @@ -208,17 +207,9 @@ func ReaderRegisterEvent[T interface{}](reader *GenericReader, // any other error we return as is return err - }) + }, toStreamHandlerOptions(opts)...) } -func (r *GenericReader) SetConcurrency(concurrency int) error { - return r.streamConsumer.SetConcurrency(concurrency) -} - -func (r *GenericReader) SetProcessingTimeout(timeout time.Duration) error { - return r.streamConsumer.SetProcessingTimeout(timeout) -} - -func (r *GenericReader) SetMaxRetryCount(retryCount int64) error { - return r.streamConsumer.SetMaxRetryCount(retryCount) +func (r *GenericReader) Configure(opts ...ReaderOption) { + r.streamConsumer.Configure(toStreamConsumerOptions(opts)...) } diff --git a/events/stream.go b/events/stream.go index dad7e296d..81664bca6 100644 --- a/events/stream.go +++ b/events/stream.go @@ -6,7 +6,6 @@ package events import ( "context" - "time" "github.com/harness/gitness/stream" ) @@ -18,10 +17,8 @@ type StreamProducer interface { // StreamConsumer is an abstraction of a consumer from the streams package. type StreamConsumer interface { - Register(streamID string, handler stream.HandlerFunc) error - SetConcurrency(int) error - SetProcessingTimeout(timeout time.Duration) error - SetMaxRetryCount(retryCount int64) error + Register(streamID string, handler stream.HandlerFunc, opts ...stream.HandlerOption) error + Configure(opts ...stream.ConsumerOption) Start(ctx context.Context) error Errors() <-chan error Infos() <-chan string diff --git a/events/wire.go b/events/wire.go index f4ba93a9d..459e16648 100644 --- a/events/wire.go +++ b/events/wire.go @@ -68,7 +68,7 @@ func provideSystemRedis(config Config, redisClient redis.UniversalClient) (*Syst func newMemoryStreamConsumerFactoryMethod(broker *stream.MemoryBroker, namespace string) StreamConsumerFactoryFunc { return func(groupName string, consumerName string) (StreamConsumer, error) { - return stream.NewMemoryConsumer(broker, namespace, groupName), nil + return stream.NewMemoryConsumer(broker, namespace, groupName) } } diff --git a/internal/events/git/branch.go b/internal/events/git/branch.go index 273013d6a..32589438e 100644 --- a/internal/events/git/branch.go +++ b/internal/events/git/branch.go @@ -31,8 +31,9 @@ func (r *Reporter) BranchCreated(ctx context.Context, payload *BranchCreatedPayl log.Ctx(ctx).Debug().Msgf("reported branch created event with id '%s'", eventID) } -func (r *Reader) RegisterBranchCreated(fn func(context.Context, *events.Event[*BranchCreatedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, BranchCreatedEvent, fn) +func (r *Reader) RegisterBranchCreated(fn events.HandlerFunc[*BranchCreatedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, BranchCreatedEvent, fn, opts...) } const BranchUpdatedEvent events.EventType = "branch-updated" @@ -56,8 +57,9 @@ func (r *Reporter) BranchUpdated(ctx context.Context, payload *BranchUpdatedPayl log.Ctx(ctx).Debug().Msgf("reported branch updated event with id '%s'", eventID) } -func (r *Reader) RegisterBranchUpdated(fn func(context.Context, *events.Event[*BranchUpdatedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn) +func (r *Reader) RegisterBranchUpdated(fn events.HandlerFunc[*BranchUpdatedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn, opts...) } const BranchDeletedEvent events.EventType = "branch-deleted" @@ -79,6 +81,7 @@ func (r *Reporter) BranchDeleted(ctx context.Context, payload *BranchDeletedPayl log.Ctx(ctx).Debug().Msgf("reported branch deleted event with id '%s'", eventID) } -func (r *Reader) RegisterBranchDeleted(fn func(context.Context, *events.Event[*BranchDeletedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn) +func (r *Reader) RegisterBranchDeleted(fn events.HandlerFunc[*BranchDeletedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn, opts...) } diff --git a/internal/events/git/reader.go b/internal/events/git/reader.go index 7263a2482..0d0513103 100644 --- a/internal/events/git/reader.go +++ b/internal/events/git/reader.go @@ -5,8 +5,6 @@ package events import ( - "time" - "github.com/harness/gitness/events" ) @@ -27,14 +25,6 @@ type Reader struct { innerReader *events.GenericReader } -func (r *Reader) SetConcurrency(concurrency int) error { - return r.innerReader.SetConcurrency(concurrency) -} - -func (r *Reader) SetProcessingTimeout(timeout time.Duration) error { - return r.innerReader.SetProcessingTimeout(timeout) -} - -func (r *Reader) SetMaxRetryCount(retryCount int64) error { - return r.innerReader.SetMaxRetryCount(retryCount) +func (r *Reader) Configure(opts ...events.ReaderOption) { + r.innerReader.Configure(opts...) } diff --git a/internal/events/git/tag.go b/internal/events/git/tag.go index ff4646d1f..8908ab00e 100644 --- a/internal/events/git/tag.go +++ b/internal/events/git/tag.go @@ -31,8 +31,9 @@ func (r *Reporter) TagCreated(ctx context.Context, payload *TagCreatedPayload) { log.Ctx(ctx).Debug().Msgf("reported tag created event with id '%s'", eventID) } -func (r *Reader) RegisterTagCreated(fn func(context.Context, *events.Event[*TagCreatedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, TagCreatedEvent, fn) +func (r *Reader) RegisterTagCreated(fn events.HandlerFunc[*TagCreatedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, TagCreatedEvent, fn, opts...) } const TagUpdatedEvent events.EventType = "tag-updated" @@ -56,8 +57,9 @@ func (r *Reporter) TagUpdated(ctx context.Context, payload *TagUpdatedPayload) { log.Ctx(ctx).Debug().Msgf("reported tag updated event with id '%s'", eventID) } -func (r *Reader) RegisterTagUpdated(fn func(context.Context, *events.Event[*TagUpdatedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, TagUpdatedEvent, fn) +func (r *Reader) RegisterTagUpdated(fn events.HandlerFunc[*TagUpdatedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, TagUpdatedEvent, fn, opts...) } const TagDeletedEvent events.EventType = "tag-deleted" @@ -79,6 +81,7 @@ func (r *Reporter) TagDeleted(ctx context.Context, payload *TagDeletedPayload) { log.Ctx(ctx).Debug().Msgf("reported tag deleted event with id '%s'", eventID) } -func (r *Reader) RegisterTagDeleted(fn func(context.Context, *events.Event[*TagDeletedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, TagDeletedEvent, fn) +func (r *Reader) RegisterTagDeleted(fn events.HandlerFunc[*TagDeletedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, TagDeletedEvent, fn, opts...) } diff --git a/internal/events/pullreq/events.go b/internal/events/pullreq/events.go index b7137e0a4..8572a505e 100644 --- a/internal/events/pullreq/events.go +++ b/internal/events/pullreq/events.go @@ -44,8 +44,9 @@ func (r *Reporter) Created(ctx context.Context, payload *CreatedPayload) { log.Ctx(ctx).Debug().Msgf("reported pull request created event with id '%s'", eventID) } -func (r *Reader) RegisterCreated(fn func(context.Context, *events.Event[*CreatedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, CreatedEvent, fn) +func (r *Reader) RegisterCreated(fn events.HandlerFunc[*CreatedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, CreatedEvent, fn, opts...) } const TitleChangedEvent events.EventType = "title-changed" @@ -70,8 +71,9 @@ func (r *Reporter) TitleChanged(ctx context.Context, payload *TitleChangedPayloa log.Ctx(ctx).Debug().Msgf("reported pull request title changed event with id '%s'", eventID) } -func (r *Reader) RegisterTitleChanged(fn func(context.Context, *events.Event[*TitleChangedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, TitleChangedEvent, fn) +func (r *Reader) RegisterTitleChanged(fn events.HandlerFunc[*TitleChangedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, TitleChangedEvent, fn, opts...) } const BranchUpdatedEvent events.EventType = "branch-updated" @@ -96,8 +98,9 @@ func (r *Reporter) BranchUpdated(ctx context.Context, payload *BranchUpdatedPayl log.Ctx(ctx).Debug().Msgf("reported pull request branch updated event with id '%s'", eventID) } -func (r *Reader) RegisterBranchUpdated(fn func(context.Context, *events.Event[*BranchUpdatedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn) +func (r *Reader) RegisterBranchUpdated(fn events.HandlerFunc[*BranchUpdatedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn, opts...) } const StateChangedEvent events.EventType = "state-changed" @@ -126,8 +129,9 @@ func (r *Reporter) StateChanged(ctx context.Context, payload *StateChangedPayloa log.Ctx(ctx).Debug().Msgf("reported pull request state changed event with id '%s'", eventID) } -func (r *Reader) RegisterStateChanged(fn func(context.Context, *events.Event[*StateChangedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, StateChangedEvent, fn) +func (r *Reader) RegisterStateChanged(fn events.HandlerFunc[*StateChangedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, StateChangedEvent, fn, opts...) } const BranchDeletedEvent events.EventType = "branch-deleted" @@ -151,8 +155,9 @@ func (r *Reporter) BranchDeleted(ctx context.Context, payload *BranchDeletedPayl log.Ctx(ctx).Debug().Msgf("reported pull request branch deleted event with id '%s'", eventID) } -func (r *Reader) RegisterBranchDeleted(fn func(context.Context, *events.Event[*BranchDeletedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn) +func (r *Reader) RegisterBranchDeleted(fn events.HandlerFunc[*BranchDeletedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn, opts...) } const ReviewSubmittedEvent events.EventType = "review-submitted" @@ -177,8 +182,9 @@ func (r *Reporter) ReviewSubmitted(ctx context.Context, payload *ReviewSubmitted log.Ctx(ctx).Debug().Msgf("reported pull request review submitted with id '%s'", eventID) } -func (r *Reader) RegisterReviewSubmitted(fn func(context.Context, *events.Event[*ReviewSubmittedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, ReviewSubmittedEvent, fn) +func (r *Reader) RegisterReviewSubmitted(fn events.HandlerFunc[*ReviewSubmittedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, ReviewSubmittedEvent, fn, opts...) } const MergedEvent events.EventType = "merged" @@ -203,6 +209,7 @@ func (r *Reporter) Merged(ctx context.Context, payload *MergedPayload) { log.Ctx(ctx).Debug().Msgf("reported pull request merged event with id '%s'", eventID) } -func (r *Reader) RegisterMerged(fn func(context.Context, *events.Event[*MergedPayload]) error) error { - return events.ReaderRegisterEvent(r.innerReader, MergedEvent, fn) +func (r *Reader) RegisterMerged(fn events.HandlerFunc[*MergedPayload], + opts ...events.HandlerOption) error { + return events.ReaderRegisterEvent(r.innerReader, MergedEvent, fn, opts...) } diff --git a/internal/events/pullreq/reader.go b/internal/events/pullreq/reader.go index 7daf3ff77..aa383619a 100644 --- a/internal/events/pullreq/reader.go +++ b/internal/events/pullreq/reader.go @@ -5,8 +5,6 @@ package events import ( - "time" - "github.com/harness/gitness/events" ) @@ -25,14 +23,6 @@ type Reader struct { innerReader *events.GenericReader } -func (r *Reader) SetConcurrency(concurrency int) error { - return r.innerReader.SetConcurrency(concurrency) -} - -func (r *Reader) SetProcessingTimeout(timeout time.Duration) error { - return r.innerReader.SetProcessingTimeout(timeout) -} - -func (r *Reader) SetMaxRetryCount(retryCount int64) error { - return r.innerReader.SetMaxRetryCount(retryCount) +func (r *Reader) Configure(opts ...events.ReaderOption) { + r.innerReader.Configure(opts...) } diff --git a/internal/services/pullreq/pullreq.go b/internal/services/pullreq/pullreq.go index b36c38740..ed8ad26de 100644 --- a/internal/services/pullreq/pullreq.go +++ b/internal/services/pullreq/pullreq.go @@ -13,6 +13,7 @@ import ( gitevents "github.com/harness/gitness/internal/events/git" pullreqevents "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/store" + "github.com/harness/gitness/stream" "github.com/harness/gitness/types" "github.com/jmoiron/sqlx" @@ -57,10 +58,13 @@ func New(ctx context.Context, const groupGit = "gitness:pullreq:git" _, err = gitReaderFactory.Launch(ctx, groupGit, config.InstanceID, func(r *gitevents.Reader) error { - const processingTimeout = 15 * time.Second - _ = r.SetConcurrency(1) - _ = r.SetMaxRetryCount(3) - _ = r.SetProcessingTimeout(processingTimeout) + const idleTimeout = 15 * time.Second + r.Configure( + stream.WithConcurrency(1), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(3), + )) _ = r.RegisterBranchUpdated(service.triggerPullReqBranchUpdate) _ = r.RegisterBranchDeleted(service.closePullReqBranchDelete) @@ -76,11 +80,13 @@ func New(ctx context.Context, const groupActivity = "gitness:pullreq:activity" _, err = pullreqEvReaderFactory.Launch(ctx, groupActivity, config.InstanceID, func(r *pullreqevents.Reader) error { - const processingTimeout = 10 * time.Second - _ = r.SetConcurrency(1) - _ = r.SetMaxRetryCount(3) - _ = r.SetProcessingTimeout(processingTimeout) - + const idleTimeout = 10 * time.Second + r.Configure( + stream.WithConcurrency(1), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(3), + )) _ = r.RegisterBranchUpdated(service.addActivityBranchUpdate) _ = r.RegisterBranchDeleted(service.addActivityBranchDelete) _ = r.RegisterStateChanged(service.addActivityStateChange) @@ -99,10 +105,13 @@ func New(ctx context.Context, const groupPullReqHeadRef = "gitness:pullreq:headref" _, err = pullreqEvReaderFactory.Launch(ctx, groupPullReqHeadRef, config.InstanceID, func(r *pullreqevents.Reader) error { - const processingTimeout = 10 * time.Second - _ = r.SetConcurrency(1) - _ = r.SetMaxRetryCount(3) - _ = r.SetProcessingTimeout(processingTimeout) + const idleTimeout = 10 * time.Second + r.Configure( + stream.WithConcurrency(1), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(3), + )) _ = r.RegisterCreated(service.createHeadRefCreated) _ = r.RegisterBranchUpdated(service.updateHeadRefBranchUpdate) diff --git a/internal/services/webhook/service.go b/internal/services/webhook/service.go index a87e7a066..d28cf32e8 100644 --- a/internal/services/webhook/service.go +++ b/internal/services/webhook/service.go @@ -16,17 +16,17 @@ import ( gitevents "github.com/harness/gitness/internal/events/git" "github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/url" + "github.com/harness/gitness/stream" ) const ( eventsReaderGroupName = "gitness:webhook" - processingTimeout = 2 * time.Minute ) type Config struct { EventReaderName string `envconfig:"GITNESS_WEBHOOK_EVENT_READER_NAME"` Concurrency int `envconfig:"GITNESS_WEBHOOK_CONCURRENCY" default:"4"` - MaxRetryCount int64 `envconfig:"GITNESS_WEBHOOK_MAX_RETRY_COUNT" default:"3"` + MaxRetries int `envconfig:"GITNESS_WEBHOOK_MAX_RETRIES" default:"3"` AllowPrivateNetwork bool `envconfig:"GITNESS_WEBHOOK_ALLOW_PRIVATE_NETWORK" default:"false"` AllowLoopback bool `envconfig:"GITNESS_WEBHOOK_ALLOW_LOOPBACK" default:"false"` } @@ -41,8 +41,8 @@ func (c *Config) Validate() error { if c.Concurrency < 1 { return errors.New("config.Concurrency has to be a positive number") } - if c.MaxRetryCount < 0 { - return errors.New("config.MaxRetryCount can't be negative") + if c.MaxRetries < 0 { + return errors.New("config.MaxRetries can't be negative") } return nil @@ -86,10 +86,13 @@ func NewService(ctx context.Context, config Config, } canceler, err := gitReaderFactory.Launch(ctx, eventsReaderGroupName, config.EventReaderName, func(r *gitevents.Reader) error { - // configure reader - _ = r.SetConcurrency(config.Concurrency) - _ = r.SetMaxRetryCount(config.MaxRetryCount) - _ = r.SetProcessingTimeout(processingTimeout) + const idleTimeout = 1 * time.Minute + r.Configure( + stream.WithConcurrency(config.Concurrency), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(config.MaxRetries), + )) // register events _ = r.RegisterBranchCreated(service.handleEventBranchCreated) diff --git a/stream/memory_consumer.go b/stream/memory_consumer.go index 24eb08524..7f35fb6b2 100644 --- a/stream/memory_consumer.go +++ b/stream/memory_consumer.go @@ -23,46 +23,63 @@ type memoryMessage struct { type MemoryConsumer struct { broker *MemoryBroker // namespace specifies the namespace of the keys - any stream key will be prefixed with it - namespace string - concurrency int - maxRetryCount int64 - groupName string - streams map[string]HandlerFunc + namespace string + // groupName specifies the name of the consumer group. + groupName string - state consumerState + // Config is the generic consumer configuration. + Config ConsumerConfig + + // streams is a map of all registered streams and their handlers. + streams map[string]handler + + isStarted bool messageQueue chan memoryMessage errorCh chan error infoCh chan string } -func NewMemoryConsumer(broker *MemoryBroker, namespace string, groupName string) *MemoryConsumer { +func NewMemoryConsumer(broker *MemoryBroker, namespace string, groupName string) (*MemoryConsumer, error) { + if groupName == "" { + return nil, errors.New("groupName can't be empty") + } + const queueCapacity = 500 const errorChCapacity = 64 const infoChCapacity = 64 - const concurrency = 1 + return &MemoryConsumer{ broker: broker, namespace: namespace, - concurrency: concurrency, groupName: groupName, - streams: make(map[string]HandlerFunc), - state: consumerStateSetup, + streams: map[string]handler{}, + Config: defaultConfig, + isStarted: false, messageQueue: make(chan memoryMessage, queueCapacity), errorCh: make(chan error, errorChCapacity), infoCh: make(chan string, infoChCapacity), + }, nil +} + +func (c *MemoryConsumer) Configure(opts ...ConsumerOption) { + if c.isStarted { + return + } + + for _, opt := range opts { + opt.apply(&c.Config) } } -func (c *MemoryConsumer) Register(streamID string, handler HandlerFunc) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err +func (c *MemoryConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error { + if c.isStarted { + return ErrAlreadyStarted } - if streamID == "" { return errors.New("streamID can't be empty") } - if handler == nil { - return errors.New("handler can't be empty") + if fn == nil { + return errors.New("fn can't be empty") } // transpose streamID to key namespace - no need to keep inner streamID @@ -71,59 +88,29 @@ func (c *MemoryConsumer) Register(streamID string, handler HandlerFunc) error { return fmt.Errorf("consumer is already registered for '%s' (full stream '%s')", streamID, transposedStreamID) } - c.streams[transposedStreamID] = handler - return nil -} - -func (c *MemoryConsumer) SetConcurrency(concurrency int) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err + config := c.Config.DefaultHandlerConfig + for _, opt := range opts { + opt.apply(&config) } - if concurrency < 1 || concurrency > MaxConcurrency { - return fmt.Errorf("concurrency has to be between 1 and %d (inclusive)", MaxConcurrency) + c.streams[transposedStreamID] = handler{ + handle: fn, + config: config, } - - c.concurrency = concurrency - - return nil -} - -func (c *MemoryConsumer) SetMaxRetryCount(retryCount int64) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err - } - - if retryCount < 1 || retryCount > MaxRetryCount { - return fmt.Errorf("max retry count has to be between 1 and %d (inclusive)", MaxRetryCount) - } - - c.maxRetryCount = retryCount - - return nil -} - -func (c *MemoryConsumer) SetProcessingTimeout(timeout time.Duration) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err - } - - // we don't have an idle timeout for this implementation - return nil } func (c *MemoryConsumer) Start(ctx context.Context) error { - if err := checkConsumerStateTransition(c.state, consumerStateStarted); err != nil { - return err + if c.isStarted { + return ErrAlreadyStarted } if len(c.streams) == 0 { return errors.New("no streams registered") } - // update state to started before starting go routines (can't error out from here) - c.state = consumerStateStarted + // mark as started before starting go routines (can't error out from here) + c.isStarted = true wg := &sync.WaitGroup{} @@ -137,7 +124,7 @@ func (c *MemoryConsumer) Start(ctx context.Context) error { } // start workers - for i := 0; i < c.concurrency; i++ { + for i := 0; i < c.Config.Concurrency; i++ { wg.Add(1) go func() { defer wg.Done() @@ -150,9 +137,6 @@ func (c *MemoryConsumer) Start(ctx context.Context) error { // wait for all go routines to complete wg.Wait() - // update state to finished - c.state = consumerStateFinished - close(c.messageQueue) close(c.infoCh) close(c.errorCh) @@ -184,7 +168,7 @@ func (c *MemoryConsumer) consume(ctx context.Context) { case <-ctx.Done(): return case m := <-c.messageQueue: - fn, ok := c.streams[m.streamID] + handler, ok := c.streams[m.streamID] if !ok { // we only take messages from registered streams, this should never happen. // WARNING this will discard the message @@ -202,14 +186,14 @@ func (c *MemoryConsumer) consume(ctx context.Context) { } }() - return fn(ctx, m.id, m.values) + return handler.handle(ctx, m.id, m.values) }() if err != nil { c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w", m.id, m.streamID, m.retries, err)) - if m.retries >= c.maxRetryCount { + if m.retries >= int64(handler.config.maxRetries) { c.pushError(fmt.Errorf( "discard message with id '%s' from stream '%s' - failed %d retries", m.id, m.streamID, m.retries)) @@ -223,7 +207,7 @@ func (c *MemoryConsumer) consume(ctx context.Context) { // IMPORTANT: this won't requeue to broker, only in this consumer's queue! go func() { // TODO: linear/exponential backoff relative to retry count might be good - time.Sleep(5 * time.Second) + time.Sleep(handler.config.idleTimeout) c.messageQueue <- m }() } diff --git a/stream/options.go b/stream/options.go new file mode 100644 index 000000000..3a041f1c3 --- /dev/null +++ b/stream/options.go @@ -0,0 +1,90 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package stream + +import ( + "fmt" + "time" +) + +const ( + // MaxConcurrency is the max number of concurrent go routines (for message handling) for a single stream consumer. + MaxConcurrency = 64 + + // MaxMaxRetries is the max number of retries of a message for a single consumer group. + MaxMaxRetries = 64 + + // MinIdleTimeout is the minumum time that can be configured as idle timeout for a stream consumer. + MinIdleTimeout = 5 * time.Second +) + +// ConsumerOption is used to configure consumers. +type ConsumerOption interface { + apply(*ConsumerConfig) +} + +// consumerOptionFunc allows to have functions implement the ConsumerOption interface. +type consumerOptionFunc func(*ConsumerConfig) + +// Apply calls f(config). +func (f consumerOptionFunc) apply(config *ConsumerConfig) { + f(config) +} + +// WithConcurrency sets up the concurrency of the stream consumer. +func WithConcurrency(concurrency int) ConsumerOption { + if concurrency < 1 || concurrency > MaxConcurrency { + // missconfiguration - panic to keep options clean + panic(fmt.Sprintf("provided concurrency %d is invalid - has to be between 1 and %d", + concurrency, MaxConcurrency)) + } + return consumerOptionFunc(func(c *ConsumerConfig) { + c.Concurrency = concurrency + }) +} + +// WithHandlerOptions sets up the default handler options of a stream consumer. +func WithHandlerOptions(opts ...HandlerOption) ConsumerOption { + return consumerOptionFunc(func(c *ConsumerConfig) { + for _, opt := range opts { + opt.apply(&c.DefaultHandlerConfig) + } + }) +} + +// HandlerOption is used to configure the handler consuming a single stream. +type HandlerOption interface { + apply(*HandlerConfig) +} + +// handlerOptionFunc allows to have functions implement the HandlerOption interface. +type handlerOptionFunc func(*HandlerConfig) + +// Apply calls f(config). +func (f handlerOptionFunc) apply(config *HandlerConfig) { + f(config) +} + +// WithMaxRetries can be used to set the max retry count for a specific handler. +func WithMaxRetries(maxRetries int) HandlerOption { + if maxRetries < 0 || maxRetries > MaxMaxRetries { + // missconfiguration - panic to keep options clean + panic(fmt.Sprintf("provided maxRetries %d is invalid - has to be between 0 and %d", maxRetries, MaxMaxRetries)) + } + return handlerOptionFunc(func(c *HandlerConfig) { + c.maxRetries = maxRetries + }) +} + +// WithIdleTimeout can be used to set the idle timeout for a specific handler. +func WithIdleTimeout(timeout time.Duration) HandlerOption { + if timeout < MinIdleTimeout { + // missconfiguration - panic to keep options clean + panic(fmt.Sprintf("provided timeout %d is invalid - has to be longer than %s", timeout, MinIdleTimeout)) + } + return handlerOptionFunc(func(c *HandlerConfig) { + c.idleTimeout = timeout + }) +} diff --git a/stream/redis_consumer.go b/stream/redis_consumer.go index 8a0a01dd7..1182d9599 100644 --- a/stream/redis_consumer.go +++ b/stream/redis_consumer.go @@ -22,22 +22,18 @@ type RedisConsumer struct { rdb redis.UniversalClient // namespace specifies the namespace of the keys - any stream key will be prefixed with it namespace string - // groupName specifies the name of the consumer group + // groupName specifies the name of the consumer group. groupName string - // consumerName specifies the name of the consumer + // consumerName specifies the name of the consumer. consumerName string - // processingTimeout specifies the maximum duration a message stays read but unacknowleged - // before it can be claimed by others. - processingTimeout time.Duration - // streams is a map of all streams to consum and their handler function - streams map[string]HandlerFunc - // concurrency specifies the number of worker go routines - concurrency int - // maxRetryCount specifies the max number an event is retried - maxRetryCount int64 + // Config is the generic consumer configuration. + Config ConsumerConfig - state consumerState + // streams is a map of all registered streams and their handlers. + streams map[string]handler + + isStarted bool messageQueue chan message errorCh chan error infoCh chan string @@ -58,34 +54,40 @@ func NewRedisConsumer(rdb redis.UniversalClient, namespace string, const queueCapacity = 500 const errorChCapacity = 64 const infoChCapacity = 64 - const concurrency = 2 - const processingTimeout = 5 * time.Minute return &RedisConsumer{ - rdb: rdb, - namespace: namespace, - groupName: groupName, - consumerName: consumerName, - streams: map[string]HandlerFunc{}, - processingTimeout: processingTimeout, - concurrency: concurrency, - state: consumerStateSetup, - messageQueue: make(chan message, queueCapacity), - errorCh: make(chan error, errorChCapacity), - infoCh: make(chan string, infoChCapacity), + rdb: rdb, + namespace: namespace, + groupName: groupName, + consumerName: consumerName, + streams: map[string]handler{}, + Config: defaultConfig, + isStarted: false, + messageQueue: make(chan message, queueCapacity), + errorCh: make(chan error, errorChCapacity), + infoCh: make(chan string, infoChCapacity), }, nil } -func (c *RedisConsumer) Register(streamID string, handler HandlerFunc) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err +func (c *RedisConsumer) Configure(opts ...ConsumerOption) { + if c.isStarted { + return } + for _, opt := range opts { + opt.apply(&c.Config) + } +} + +func (c *RedisConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error { + if c.isStarted { + return ErrAlreadyStarted + } if streamID == "" { return errors.New("streamID can't be empty") } - if handler == nil { - return errors.New("handler can't be empty") + if fn == nil { + return errors.New("fn can't be empty") } // transpose streamID to key namespace - no need to keep inner streamID @@ -94,55 +96,23 @@ func (c *RedisConsumer) Register(streamID string, handler HandlerFunc) error { return fmt.Errorf("consumer is already registered for '%s' (redis stream '%s')", streamID, transposedStreamID) } - c.streams[transposedStreamID] = handler - return nil -} - -func (c *RedisConsumer) SetConcurrency(concurrency int) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err + // create final config for handler + config := c.Config.DefaultHandlerConfig + for _, opt := range opts { + opt.apply(&config) } - if concurrency < 1 || concurrency > MaxConcurrency { - return fmt.Errorf("concurrency has to be between 1 and %d (inclusive)", MaxConcurrency) + c.streams[transposedStreamID] = handler{ + handle: fn, + config: config, } - c.concurrency = concurrency - - return nil -} - -func (c *RedisConsumer) SetProcessingTimeout(timeout time.Duration) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err - } - - if timeout < MinProcessingTimeout { - return fmt.Errorf("processing timeout %v is too short, it has to be at least %v", timeout, MinProcessingTimeout) - } - - c.processingTimeout = timeout - - return nil -} - -func (c *RedisConsumer) SetMaxRetryCount(retryCount int64) error { - if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil { - return err - } - - if retryCount < 1 || retryCount > MaxRetryCount { - return fmt.Errorf("max retry count has to be between 1 and %d (inclusive)", MaxRetryCount) - } - - c.maxRetryCount = retryCount - return nil } func (c *RedisConsumer) Start(ctx context.Context) error { - if err := checkConsumerStateTransition(c.state, consumerStateStarted); err != nil { - return err + if c.isStarted { + return ErrAlreadyStarted } if len(c.streams) == 0 { @@ -163,8 +133,8 @@ func (c *RedisConsumer) Start(ctx context.Context) error { return err } - // update state to started before starting go routines (can't error out from here) - c.state = consumerStateStarted + // mark as started before starting go routines (can't error out from here) + c.isStarted = true wg := &sync.WaitGroup{} @@ -179,11 +149,15 @@ func (c *RedisConsumer) Start(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - // launch redis message reclaimer, it will finish when the ctx is done - c.reclaimer(ctx, time.Minute) + // launch redis message reclaimer, it will finish when the ctx is done. + // IMPORTANT: Keep reclaim interval small for now to support faster retries => higher load on redis! + // TODO: Make retries local by default with opt-in cross-instance retries. + // https://harness.atlassian.net/browse/SCM-83 + const reclaimInterval = 10 * time.Second + c.reclaimer(ctx, reclaimInterval) }() - for i := 0; i < c.concurrency; i++ { + for i := 0; i < c.Config.Concurrency; i++ { wg.Add(1) go func() { defer wg.Done() @@ -196,9 +170,6 @@ func (c *RedisConsumer) Start(ctx context.Context) error { // wait for all go routines to complete wg.Wait() - // update state to finished - c.state = consumerStateFinished - // close all channels close(c.messageQueue) close(c.errorCh) @@ -372,31 +343,29 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura case <-ctx.Done(): return case <-reclaimTimer.C: - for streamID := range c.streams { + for streamID, handler := range c.streams { resPending, errPending := c.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: streamID, Group: c.groupName, Start: start, End: end, - Idle: c.processingTimeout, + Idle: handler.config.idleTimeout, Count: int64(count), }).Result() if errPending != nil && !errors.Is(errPending, redis.Nil) { c.pushError(fmt.Errorf("failed to fetch pending messages: %w", errPending)) - reclaimTimer.Reset(reclaimInterval) - break + continue } if len(resPending) == 0 { - reclaimTimer.Reset(reclaimInterval) - break + continue } // It's safe to change start of the requested range for the next iteration to oldest message. start = resPending[0].ID for _, resMessage := range resPending { - if resMessage.RetryCount > c.maxRetryCount { + if resMessage.RetryCount > int64(handler.config.maxRetries) { // Retry count gets increased after every XCLAIM. // Large retry count might mean there is something wrong with the message, so we'll XACK it. // WARNING this will discard the message! @@ -406,9 +375,10 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura "failed to force acknowledge (discard) message '%s' (Retries: %d) in stream '%s': %w", resMessage.ID, resMessage.RetryCount, streamID, errAck)) } else { + retryCount := resMessage.RetryCount - 1 // redis is counting this execution as retry c.pushError(fmt.Errorf( "force acknowledged (discarded) message '%s' (Retries: %d) in stream '%s'", - resMessage.ID, resMessage.RetryCount, streamID)) + resMessage.ID, retryCount, streamID)) } continue } @@ -418,7 +388,7 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura Stream: streamID, Group: c.groupName, Consumer: c.consumerName, - MinIdle: c.processingTimeout, + MinIdle: handler.config.idleTimeout, Messages: []string{resMessage.ID}, }).Result() @@ -475,9 +445,9 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura } else { count = baseCount } - - reclaimTimer.Reset(reclaimInterval) } + + reclaimTimer.Reset(reclaimInterval) } } } @@ -494,7 +464,7 @@ func (c *RedisConsumer) consumer(ctx context.Context) { return } - fn, ok := c.streams[m.streamID] + handler, ok := c.streams[m.streamID] if !ok { // we don't want to ack the message // maybe someone else can claim and process it (worst case it expires) @@ -512,7 +482,7 @@ func (c *RedisConsumer) consumer(ctx context.Context) { } }() - return fn(ctx, m.id, m.values) + return handler.handle(ctx, m.id, m.values) }() if err != nil { c.pushError(fmt.Errorf("failed to process message '%s' in stream '%s': %w", m.id, m.streamID, err)) diff --git a/stream/stream.go b/stream/stream.go index 414715d5f..5c0899c55 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -6,24 +6,53 @@ package stream import ( "context" + "errors" "fmt" "time" ) -const ( - // MaxConcurrency is the max number of concurrent go routines (for message handling) for a single stream consumer. - MaxConcurrency = 64 +var ( + ErrAlreadyStarted = errors.New("consumer already started") - // MaxRetryCount is the max number of retries of an event for a single consumer group. - MaxRetryCount = 64 - - // MinProcessingTimeout is the minumum time that can be configured as processing timeout for a stream consumer. - MinProcessingTimeout = 1 * time.Minute + defaultConfig = ConsumerConfig{ + Concurrency: 2, + DefaultHandlerConfig: HandlerConfig{ + idleTimeout: 1 * time.Minute, + maxRetries: 2, + }, + } ) +// ConsumerConfig defines the configuration of a consumer containing externally exposed values +// that can be configured using the available ConsumerOptions. +type ConsumerConfig struct { + // Concurrency specifies the number of worker go routines executing stream handlers. + Concurrency int + + // DefaultHandlerConfig is the default config used for stream handlers. + DefaultHandlerConfig HandlerConfig +} + +// HandlerConfig defines the configuration for a single stream handler containing externally exposed values +// that can be configured using the available HandlerOptions. +type HandlerConfig struct { + // idleTimeout specifies the maximum duration a message stays read but unacknowleged + // before it can be claimed by others. + idleTimeout time.Duration + + // maxRetries specifies the max number a stream message is retried. + maxRetries int +} + // HandlerFunc defines the signature of a function handling stream messages. type HandlerFunc func(ctx context.Context, messageID string, payload map[string]interface{}) error +// handler defines a handler of a single stream. +type handler struct { + handle HandlerFunc + config HandlerConfig +} + // message is used internally for passing stream messages via channels. type message struct { streamID string @@ -35,33 +64,3 @@ type message struct { func transposeStreamID(namespace string, streamID string) string { return fmt.Sprintf("%s:%s", namespace, streamID) } - -// consumerState specifies the different states of a consumer. -type consumerState string - -const ( - // consumerStateSetup defines the state in which the consumer is being setup (Register, SetConcurrency, ...). - // In other words, it's the state before the consumer was started. - consumerStateSetup consumerState = "setup" - - // consumerStateStarted defines the state after the consumer was started (context not yet canceled). - consumerStateStarted consumerState = "started" - - // consumerStateFinished defines the state after the consumer has been stopped (context canceled). - consumerStateFinished consumerState = "finished" -) - -// checkConsumerStateTransition returns an error in case the state transition is not allowed, nil otherwise. -// It is used to avoid that invalid operations are being executed in a given state (e.g. Register(...) when started). -func checkConsumerStateTransition(current, updated consumerState) error { - switch { - case current == consumerStateSetup && updated == consumerStateSetup: - return nil - case current == consumerStateSetup && updated == consumerStateStarted: - return nil - case current == consumerStateStarted && updated == consumerStateFinished: - return nil - default: - return fmt.Errorf("consumer state transition from '%s' to '%s' is not possible", current, updated) - } -}