[Events] introduce options to event and stream framework (#255)

This change is adding the following:
- Add ConsumerOption and HandlerOption for configuring stream consumers and handlers.
- Add Readeroption and HandlerOption for configuring event readers and handlers
- Add HandlerOption and ReaderOption support to all existing event reader implementations
This commit is contained in:
Johannes Batzill 2023-01-28 13:34:04 -08:00 committed by GitHub
parent 4277c4c66b
commit c3dadcce7b
15 changed files with 382 additions and 288 deletions

58
events/options.go Normal file
View File

@ -0,0 +1,58 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package events
import (
"time"
"github.com/harness/gitness/stream"
)
/*
* Expose event package options to simplify usage for consumers by hiding the stream package.
* Since we only forward the options, event options are simply aliases of stream options.
*/
// ReaderOption can be used to configure event readers.
type ReaderOption stream.ConsumerOption
func toStreamConsumerOptions(opts []ReaderOption) []stream.ConsumerOption {
streamOpts := make([]stream.ConsumerOption, len(opts))
for i, opt := range opts {
streamOpts[i] = stream.ConsumerOption(opt)
}
return streamOpts
}
// WithConcurrency sets up the concurrency of the reader.
func WithConcurrency(concurrency int) ReaderOption {
return stream.WithConcurrency(concurrency)
}
// WithHandlerOptions sets up the default options for event handlers.
func WithHandlerOptions(opts ...HandlerOption) ReaderOption {
return stream.WithHandlerOptions(toStreamHandlerOptions(opts)...)
}
// HandlerOption can be used to configure event handlers.
type HandlerOption stream.HandlerOption
func toStreamHandlerOptions(opts []HandlerOption) []stream.HandlerOption {
streamOpts := make([]stream.HandlerOption, len(opts))
for i, opt := range opts {
streamOpts[i] = stream.HandlerOption(opt)
}
return streamOpts
}
// WithMaxRetries can be used to set the max retry count for a specific event handler.
func WithMaxRetries(maxRetries int) HandlerOption {
return stream.WithMaxRetries(maxRetries)
}
// WithIdleTimeout can be used to set the idle timeout for a specific event handler.
func WithIdleTimeout(timeout time.Duration) HandlerOption {
return stream.WithIdleTimeout(timeout)
}

View File

@ -10,7 +10,6 @@ import (
"encoding/gob"
"errors"
"fmt"
"time"
"github.com/rs/zerolog/log"
)
@ -131,11 +130,11 @@ func (d *ReaderCanceler) Cancel() error {
// Reader specifies the minimum functionality a reader should expose.
// NOTE: we don't want to enforce any event registration methods here, allowing full control for customized readers.
type Reader interface {
SetConcurrency(concurrency int) error
SetProcessingTimeout(timeout time.Duration) error
SetMaxRetryCount(retryCount int64) error
Configure(opts ...ReaderOption)
}
type HandlerFunc[T interface{}] func(context.Context, *Event[T]) error
// GenericReader represents an event reader that supports registering type safe handlers
// for an arbitrary set of custom events within a given event category using the ReaderRegisterEvent method.
// NOTE: Optimally this should be an interface with RegisterEvent[T] method, but that's currently not possible in go.
@ -149,7 +148,7 @@ type GenericReader struct {
// This method allows to register type safe handlers without the need of handling the raw stream payload.
// NOTE: Generic arguments are not allowed for struct methods, hence pass the reader as input parameter.
func ReaderRegisterEvent[T interface{}](reader *GenericReader,
eventType EventType, fn func(context.Context, *Event[T]) error) error {
eventType EventType, fn HandlerFunc[T], opts ...HandlerOption) error {
streamID := getStreamID(reader.category, eventType)
// register handler for event specific stream.
@ -208,17 +207,9 @@ func ReaderRegisterEvent[T interface{}](reader *GenericReader,
// any other error we return as is
return err
})
}, toStreamHandlerOptions(opts)...)
}
func (r *GenericReader) SetConcurrency(concurrency int) error {
return r.streamConsumer.SetConcurrency(concurrency)
}
func (r *GenericReader) SetProcessingTimeout(timeout time.Duration) error {
return r.streamConsumer.SetProcessingTimeout(timeout)
}
func (r *GenericReader) SetMaxRetryCount(retryCount int64) error {
return r.streamConsumer.SetMaxRetryCount(retryCount)
func (r *GenericReader) Configure(opts ...ReaderOption) {
r.streamConsumer.Configure(toStreamConsumerOptions(opts)...)
}

View File

@ -6,7 +6,6 @@ package events
import (
"context"
"time"
"github.com/harness/gitness/stream"
)
@ -18,10 +17,8 @@ type StreamProducer interface {
// StreamConsumer is an abstraction of a consumer from the streams package.
type StreamConsumer interface {
Register(streamID string, handler stream.HandlerFunc) error
SetConcurrency(int) error
SetProcessingTimeout(timeout time.Duration) error
SetMaxRetryCount(retryCount int64) error
Register(streamID string, handler stream.HandlerFunc, opts ...stream.HandlerOption) error
Configure(opts ...stream.ConsumerOption)
Start(ctx context.Context) error
Errors() <-chan error
Infos() <-chan string

View File

@ -68,7 +68,7 @@ func provideSystemRedis(config Config, redisClient redis.UniversalClient) (*Syst
func newMemoryStreamConsumerFactoryMethod(broker *stream.MemoryBroker, namespace string) StreamConsumerFactoryFunc {
return func(groupName string, consumerName string) (StreamConsumer, error) {
return stream.NewMemoryConsumer(broker, namespace, groupName), nil
return stream.NewMemoryConsumer(broker, namespace, groupName)
}
}

View File

@ -31,8 +31,9 @@ func (r *Reporter) BranchCreated(ctx context.Context, payload *BranchCreatedPayl
log.Ctx(ctx).Debug().Msgf("reported branch created event with id '%s'", eventID)
}
func (r *Reader) RegisterBranchCreated(fn func(context.Context, *events.Event[*BranchCreatedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, BranchCreatedEvent, fn)
func (r *Reader) RegisterBranchCreated(fn events.HandlerFunc[*BranchCreatedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, BranchCreatedEvent, fn, opts...)
}
const BranchUpdatedEvent events.EventType = "branch-updated"
@ -56,8 +57,9 @@ func (r *Reporter) BranchUpdated(ctx context.Context, payload *BranchUpdatedPayl
log.Ctx(ctx).Debug().Msgf("reported branch updated event with id '%s'", eventID)
}
func (r *Reader) RegisterBranchUpdated(fn func(context.Context, *events.Event[*BranchUpdatedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn)
func (r *Reader) RegisterBranchUpdated(fn events.HandlerFunc[*BranchUpdatedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn, opts...)
}
const BranchDeletedEvent events.EventType = "branch-deleted"
@ -79,6 +81,7 @@ func (r *Reporter) BranchDeleted(ctx context.Context, payload *BranchDeletedPayl
log.Ctx(ctx).Debug().Msgf("reported branch deleted event with id '%s'", eventID)
}
func (r *Reader) RegisterBranchDeleted(fn func(context.Context, *events.Event[*BranchDeletedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn)
func (r *Reader) RegisterBranchDeleted(fn events.HandlerFunc[*BranchDeletedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn, opts...)
}

View File

@ -5,8 +5,6 @@
package events
import (
"time"
"github.com/harness/gitness/events"
)
@ -27,14 +25,6 @@ type Reader struct {
innerReader *events.GenericReader
}
func (r *Reader) SetConcurrency(concurrency int) error {
return r.innerReader.SetConcurrency(concurrency)
}
func (r *Reader) SetProcessingTimeout(timeout time.Duration) error {
return r.innerReader.SetProcessingTimeout(timeout)
}
func (r *Reader) SetMaxRetryCount(retryCount int64) error {
return r.innerReader.SetMaxRetryCount(retryCount)
func (r *Reader) Configure(opts ...events.ReaderOption) {
r.innerReader.Configure(opts...)
}

View File

@ -31,8 +31,9 @@ func (r *Reporter) TagCreated(ctx context.Context, payload *TagCreatedPayload) {
log.Ctx(ctx).Debug().Msgf("reported tag created event with id '%s'", eventID)
}
func (r *Reader) RegisterTagCreated(fn func(context.Context, *events.Event[*TagCreatedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, TagCreatedEvent, fn)
func (r *Reader) RegisterTagCreated(fn events.HandlerFunc[*TagCreatedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, TagCreatedEvent, fn, opts...)
}
const TagUpdatedEvent events.EventType = "tag-updated"
@ -56,8 +57,9 @@ func (r *Reporter) TagUpdated(ctx context.Context, payload *TagUpdatedPayload) {
log.Ctx(ctx).Debug().Msgf("reported tag updated event with id '%s'", eventID)
}
func (r *Reader) RegisterTagUpdated(fn func(context.Context, *events.Event[*TagUpdatedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, TagUpdatedEvent, fn)
func (r *Reader) RegisterTagUpdated(fn events.HandlerFunc[*TagUpdatedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, TagUpdatedEvent, fn, opts...)
}
const TagDeletedEvent events.EventType = "tag-deleted"
@ -79,6 +81,7 @@ func (r *Reporter) TagDeleted(ctx context.Context, payload *TagDeletedPayload) {
log.Ctx(ctx).Debug().Msgf("reported tag deleted event with id '%s'", eventID)
}
func (r *Reader) RegisterTagDeleted(fn func(context.Context, *events.Event[*TagDeletedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, TagDeletedEvent, fn)
func (r *Reader) RegisterTagDeleted(fn events.HandlerFunc[*TagDeletedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, TagDeletedEvent, fn, opts...)
}

View File

@ -44,8 +44,9 @@ func (r *Reporter) Created(ctx context.Context, payload *CreatedPayload) {
log.Ctx(ctx).Debug().Msgf("reported pull request created event with id '%s'", eventID)
}
func (r *Reader) RegisterCreated(fn func(context.Context, *events.Event[*CreatedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, CreatedEvent, fn)
func (r *Reader) RegisterCreated(fn events.HandlerFunc[*CreatedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, CreatedEvent, fn, opts...)
}
const TitleChangedEvent events.EventType = "title-changed"
@ -70,8 +71,9 @@ func (r *Reporter) TitleChanged(ctx context.Context, payload *TitleChangedPayloa
log.Ctx(ctx).Debug().Msgf("reported pull request title changed event with id '%s'", eventID)
}
func (r *Reader) RegisterTitleChanged(fn func(context.Context, *events.Event[*TitleChangedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, TitleChangedEvent, fn)
func (r *Reader) RegisterTitleChanged(fn events.HandlerFunc[*TitleChangedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, TitleChangedEvent, fn, opts...)
}
const BranchUpdatedEvent events.EventType = "branch-updated"
@ -96,8 +98,9 @@ func (r *Reporter) BranchUpdated(ctx context.Context, payload *BranchUpdatedPayl
log.Ctx(ctx).Debug().Msgf("reported pull request branch updated event with id '%s'", eventID)
}
func (r *Reader) RegisterBranchUpdated(fn func(context.Context, *events.Event[*BranchUpdatedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn)
func (r *Reader) RegisterBranchUpdated(fn events.HandlerFunc[*BranchUpdatedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, BranchUpdatedEvent, fn, opts...)
}
const StateChangedEvent events.EventType = "state-changed"
@ -126,8 +129,9 @@ func (r *Reporter) StateChanged(ctx context.Context, payload *StateChangedPayloa
log.Ctx(ctx).Debug().Msgf("reported pull request state changed event with id '%s'", eventID)
}
func (r *Reader) RegisterStateChanged(fn func(context.Context, *events.Event[*StateChangedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, StateChangedEvent, fn)
func (r *Reader) RegisterStateChanged(fn events.HandlerFunc[*StateChangedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, StateChangedEvent, fn, opts...)
}
const BranchDeletedEvent events.EventType = "branch-deleted"
@ -151,8 +155,9 @@ func (r *Reporter) BranchDeleted(ctx context.Context, payload *BranchDeletedPayl
log.Ctx(ctx).Debug().Msgf("reported pull request branch deleted event with id '%s'", eventID)
}
func (r *Reader) RegisterBranchDeleted(fn func(context.Context, *events.Event[*BranchDeletedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn)
func (r *Reader) RegisterBranchDeleted(fn events.HandlerFunc[*BranchDeletedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, BranchDeletedEvent, fn, opts...)
}
const ReviewSubmittedEvent events.EventType = "review-submitted"
@ -177,8 +182,9 @@ func (r *Reporter) ReviewSubmitted(ctx context.Context, payload *ReviewSubmitted
log.Ctx(ctx).Debug().Msgf("reported pull request review submitted with id '%s'", eventID)
}
func (r *Reader) RegisterReviewSubmitted(fn func(context.Context, *events.Event[*ReviewSubmittedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, ReviewSubmittedEvent, fn)
func (r *Reader) RegisterReviewSubmitted(fn events.HandlerFunc[*ReviewSubmittedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, ReviewSubmittedEvent, fn, opts...)
}
const MergedEvent events.EventType = "merged"
@ -203,6 +209,7 @@ func (r *Reporter) Merged(ctx context.Context, payload *MergedPayload) {
log.Ctx(ctx).Debug().Msgf("reported pull request merged event with id '%s'", eventID)
}
func (r *Reader) RegisterMerged(fn func(context.Context, *events.Event[*MergedPayload]) error) error {
return events.ReaderRegisterEvent(r.innerReader, MergedEvent, fn)
func (r *Reader) RegisterMerged(fn events.HandlerFunc[*MergedPayload],
opts ...events.HandlerOption) error {
return events.ReaderRegisterEvent(r.innerReader, MergedEvent, fn, opts...)
}

View File

@ -5,8 +5,6 @@
package events
import (
"time"
"github.com/harness/gitness/events"
)
@ -25,14 +23,6 @@ type Reader struct {
innerReader *events.GenericReader
}
func (r *Reader) SetConcurrency(concurrency int) error {
return r.innerReader.SetConcurrency(concurrency)
}
func (r *Reader) SetProcessingTimeout(timeout time.Duration) error {
return r.innerReader.SetProcessingTimeout(timeout)
}
func (r *Reader) SetMaxRetryCount(retryCount int64) error {
return r.innerReader.SetMaxRetryCount(retryCount)
func (r *Reader) Configure(opts ...events.ReaderOption) {
r.innerReader.Configure(opts...)
}

View File

@ -13,6 +13,7 @@ import (
gitevents "github.com/harness/gitness/internal/events/git"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/stream"
"github.com/harness/gitness/types"
"github.com/jmoiron/sqlx"
@ -57,10 +58,13 @@ func New(ctx context.Context,
const groupGit = "gitness:pullreq:git"
_, err = gitReaderFactory.Launch(ctx, groupGit, config.InstanceID,
func(r *gitevents.Reader) error {
const processingTimeout = 15 * time.Second
_ = r.SetConcurrency(1)
_ = r.SetMaxRetryCount(3)
_ = r.SetProcessingTimeout(processingTimeout)
const idleTimeout = 15 * time.Second
r.Configure(
stream.WithConcurrency(1),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(3),
))
_ = r.RegisterBranchUpdated(service.triggerPullReqBranchUpdate)
_ = r.RegisterBranchDeleted(service.closePullReqBranchDelete)
@ -76,11 +80,13 @@ func New(ctx context.Context,
const groupActivity = "gitness:pullreq:activity"
_, err = pullreqEvReaderFactory.Launch(ctx, groupActivity, config.InstanceID,
func(r *pullreqevents.Reader) error {
const processingTimeout = 10 * time.Second
_ = r.SetConcurrency(1)
_ = r.SetMaxRetryCount(3)
_ = r.SetProcessingTimeout(processingTimeout)
const idleTimeout = 10 * time.Second
r.Configure(
stream.WithConcurrency(1),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(3),
))
_ = r.RegisterBranchUpdated(service.addActivityBranchUpdate)
_ = r.RegisterBranchDeleted(service.addActivityBranchDelete)
_ = r.RegisterStateChanged(service.addActivityStateChange)
@ -99,10 +105,13 @@ func New(ctx context.Context,
const groupPullReqHeadRef = "gitness:pullreq:headref"
_, err = pullreqEvReaderFactory.Launch(ctx, groupPullReqHeadRef, config.InstanceID,
func(r *pullreqevents.Reader) error {
const processingTimeout = 10 * time.Second
_ = r.SetConcurrency(1)
_ = r.SetMaxRetryCount(3)
_ = r.SetProcessingTimeout(processingTimeout)
const idleTimeout = 10 * time.Second
r.Configure(
stream.WithConcurrency(1),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(3),
))
_ = r.RegisterCreated(service.createHeadRefCreated)
_ = r.RegisterBranchUpdated(service.updateHeadRefBranchUpdate)

View File

@ -16,17 +16,17 @@ import (
gitevents "github.com/harness/gitness/internal/events/git"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/internal/url"
"github.com/harness/gitness/stream"
)
const (
eventsReaderGroupName = "gitness:webhook"
processingTimeout = 2 * time.Minute
)
type Config struct {
EventReaderName string `envconfig:"GITNESS_WEBHOOK_EVENT_READER_NAME"`
Concurrency int `envconfig:"GITNESS_WEBHOOK_CONCURRENCY" default:"4"`
MaxRetryCount int64 `envconfig:"GITNESS_WEBHOOK_MAX_RETRY_COUNT" default:"3"`
MaxRetries int `envconfig:"GITNESS_WEBHOOK_MAX_RETRIES" default:"3"`
AllowPrivateNetwork bool `envconfig:"GITNESS_WEBHOOK_ALLOW_PRIVATE_NETWORK" default:"false"`
AllowLoopback bool `envconfig:"GITNESS_WEBHOOK_ALLOW_LOOPBACK" default:"false"`
}
@ -41,8 +41,8 @@ func (c *Config) Validate() error {
if c.Concurrency < 1 {
return errors.New("config.Concurrency has to be a positive number")
}
if c.MaxRetryCount < 0 {
return errors.New("config.MaxRetryCount can't be negative")
if c.MaxRetries < 0 {
return errors.New("config.MaxRetries can't be negative")
}
return nil
@ -86,10 +86,13 @@ func NewService(ctx context.Context, config Config,
}
canceler, err := gitReaderFactory.Launch(ctx, eventsReaderGroupName, config.EventReaderName,
func(r *gitevents.Reader) error {
// configure reader
_ = r.SetConcurrency(config.Concurrency)
_ = r.SetMaxRetryCount(config.MaxRetryCount)
_ = r.SetProcessingTimeout(processingTimeout)
const idleTimeout = 1 * time.Minute
r.Configure(
stream.WithConcurrency(config.Concurrency),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(config.MaxRetries),
))
// register events
_ = r.RegisterBranchCreated(service.handleEventBranchCreated)

View File

@ -23,46 +23,63 @@ type memoryMessage struct {
type MemoryConsumer struct {
broker *MemoryBroker
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
namespace string
concurrency int
maxRetryCount int64
groupName string
streams map[string]HandlerFunc
namespace string
// groupName specifies the name of the consumer group.
groupName string
state consumerState
// Config is the generic consumer configuration.
Config ConsumerConfig
// streams is a map of all registered streams and their handlers.
streams map[string]handler
isStarted bool
messageQueue chan memoryMessage
errorCh chan error
infoCh chan string
}
func NewMemoryConsumer(broker *MemoryBroker, namespace string, groupName string) *MemoryConsumer {
func NewMemoryConsumer(broker *MemoryBroker, namespace string, groupName string) (*MemoryConsumer, error) {
if groupName == "" {
return nil, errors.New("groupName can't be empty")
}
const queueCapacity = 500
const errorChCapacity = 64
const infoChCapacity = 64
const concurrency = 1
return &MemoryConsumer{
broker: broker,
namespace: namespace,
concurrency: concurrency,
groupName: groupName,
streams: make(map[string]HandlerFunc),
state: consumerStateSetup,
streams: map[string]handler{},
Config: defaultConfig,
isStarted: false,
messageQueue: make(chan memoryMessage, queueCapacity),
errorCh: make(chan error, errorChCapacity),
infoCh: make(chan string, infoChCapacity),
}, nil
}
func (c *MemoryConsumer) Configure(opts ...ConsumerOption) {
if c.isStarted {
return
}
for _, opt := range opts {
opt.apply(&c.Config)
}
}
func (c *MemoryConsumer) Register(streamID string, handler HandlerFunc) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
func (c *MemoryConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error {
if c.isStarted {
return ErrAlreadyStarted
}
if streamID == "" {
return errors.New("streamID can't be empty")
}
if handler == nil {
return errors.New("handler can't be empty")
if fn == nil {
return errors.New("fn can't be empty")
}
// transpose streamID to key namespace - no need to keep inner streamID
@ -71,59 +88,29 @@ func (c *MemoryConsumer) Register(streamID string, handler HandlerFunc) error {
return fmt.Errorf("consumer is already registered for '%s' (full stream '%s')", streamID, transposedStreamID)
}
c.streams[transposedStreamID] = handler
return nil
}
func (c *MemoryConsumer) SetConcurrency(concurrency int) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
config := c.Config.DefaultHandlerConfig
for _, opt := range opts {
opt.apply(&config)
}
if concurrency < 1 || concurrency > MaxConcurrency {
return fmt.Errorf("concurrency has to be between 1 and %d (inclusive)", MaxConcurrency)
c.streams[transposedStreamID] = handler{
handle: fn,
config: config,
}
c.concurrency = concurrency
return nil
}
func (c *MemoryConsumer) SetMaxRetryCount(retryCount int64) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
if retryCount < 1 || retryCount > MaxRetryCount {
return fmt.Errorf("max retry count has to be between 1 and %d (inclusive)", MaxRetryCount)
}
c.maxRetryCount = retryCount
return nil
}
func (c *MemoryConsumer) SetProcessingTimeout(timeout time.Duration) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
// we don't have an idle timeout for this implementation
return nil
}
func (c *MemoryConsumer) Start(ctx context.Context) error {
if err := checkConsumerStateTransition(c.state, consumerStateStarted); err != nil {
return err
if c.isStarted {
return ErrAlreadyStarted
}
if len(c.streams) == 0 {
return errors.New("no streams registered")
}
// update state to started before starting go routines (can't error out from here)
c.state = consumerStateStarted
// mark as started before starting go routines (can't error out from here)
c.isStarted = true
wg := &sync.WaitGroup{}
@ -137,7 +124,7 @@ func (c *MemoryConsumer) Start(ctx context.Context) error {
}
// start workers
for i := 0; i < c.concurrency; i++ {
for i := 0; i < c.Config.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
@ -150,9 +137,6 @@ func (c *MemoryConsumer) Start(ctx context.Context) error {
// wait for all go routines to complete
wg.Wait()
// update state to finished
c.state = consumerStateFinished
close(c.messageQueue)
close(c.infoCh)
close(c.errorCh)
@ -184,7 +168,7 @@ func (c *MemoryConsumer) consume(ctx context.Context) {
case <-ctx.Done():
return
case m := <-c.messageQueue:
fn, ok := c.streams[m.streamID]
handler, ok := c.streams[m.streamID]
if !ok {
// we only take messages from registered streams, this should never happen.
// WARNING this will discard the message
@ -202,14 +186,14 @@ func (c *MemoryConsumer) consume(ctx context.Context) {
}
}()
return fn(ctx, m.id, m.values)
return handler.handle(ctx, m.id, m.values)
}()
if err != nil {
c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w",
m.id, m.streamID, m.retries, err))
if m.retries >= c.maxRetryCount {
if m.retries >= int64(handler.config.maxRetries) {
c.pushError(fmt.Errorf(
"discard message with id '%s' from stream '%s' - failed %d retries",
m.id, m.streamID, m.retries))
@ -223,7 +207,7 @@ func (c *MemoryConsumer) consume(ctx context.Context) {
// IMPORTANT: this won't requeue to broker, only in this consumer's queue!
go func() {
// TODO: linear/exponential backoff relative to retry count might be good
time.Sleep(5 * time.Second)
time.Sleep(handler.config.idleTimeout)
c.messageQueue <- m
}()
}

90
stream/options.go Normal file
View File

@ -0,0 +1,90 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package stream
import (
"fmt"
"time"
)
const (
// MaxConcurrency is the max number of concurrent go routines (for message handling) for a single stream consumer.
MaxConcurrency = 64
// MaxMaxRetries is the max number of retries of a message for a single consumer group.
MaxMaxRetries = 64
// MinIdleTimeout is the minumum time that can be configured as idle timeout for a stream consumer.
MinIdleTimeout = 5 * time.Second
)
// ConsumerOption is used to configure consumers.
type ConsumerOption interface {
apply(*ConsumerConfig)
}
// consumerOptionFunc allows to have functions implement the ConsumerOption interface.
type consumerOptionFunc func(*ConsumerConfig)
// Apply calls f(config).
func (f consumerOptionFunc) apply(config *ConsumerConfig) {
f(config)
}
// WithConcurrency sets up the concurrency of the stream consumer.
func WithConcurrency(concurrency int) ConsumerOption {
if concurrency < 1 || concurrency > MaxConcurrency {
// missconfiguration - panic to keep options clean
panic(fmt.Sprintf("provided concurrency %d is invalid - has to be between 1 and %d",
concurrency, MaxConcurrency))
}
return consumerOptionFunc(func(c *ConsumerConfig) {
c.Concurrency = concurrency
})
}
// WithHandlerOptions sets up the default handler options of a stream consumer.
func WithHandlerOptions(opts ...HandlerOption) ConsumerOption {
return consumerOptionFunc(func(c *ConsumerConfig) {
for _, opt := range opts {
opt.apply(&c.DefaultHandlerConfig)
}
})
}
// HandlerOption is used to configure the handler consuming a single stream.
type HandlerOption interface {
apply(*HandlerConfig)
}
// handlerOptionFunc allows to have functions implement the HandlerOption interface.
type handlerOptionFunc func(*HandlerConfig)
// Apply calls f(config).
func (f handlerOptionFunc) apply(config *HandlerConfig) {
f(config)
}
// WithMaxRetries can be used to set the max retry count for a specific handler.
func WithMaxRetries(maxRetries int) HandlerOption {
if maxRetries < 0 || maxRetries > MaxMaxRetries {
// missconfiguration - panic to keep options clean
panic(fmt.Sprintf("provided maxRetries %d is invalid - has to be between 0 and %d", maxRetries, MaxMaxRetries))
}
return handlerOptionFunc(func(c *HandlerConfig) {
c.maxRetries = maxRetries
})
}
// WithIdleTimeout can be used to set the idle timeout for a specific handler.
func WithIdleTimeout(timeout time.Duration) HandlerOption {
if timeout < MinIdleTimeout {
// missconfiguration - panic to keep options clean
panic(fmt.Sprintf("provided timeout %d is invalid - has to be longer than %s", timeout, MinIdleTimeout))
}
return handlerOptionFunc(func(c *HandlerConfig) {
c.idleTimeout = timeout
})
}

View File

@ -22,22 +22,18 @@ type RedisConsumer struct {
rdb redis.UniversalClient
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
namespace string
// groupName specifies the name of the consumer group
// groupName specifies the name of the consumer group.
groupName string
// consumerName specifies the name of the consumer
// consumerName specifies the name of the consumer.
consumerName string
// processingTimeout specifies the maximum duration a message stays read but unacknowleged
// before it can be claimed by others.
processingTimeout time.Duration
// streams is a map of all streams to consum and their handler function
streams map[string]HandlerFunc
// concurrency specifies the number of worker go routines
concurrency int
// maxRetryCount specifies the max number an event is retried
maxRetryCount int64
// Config is the generic consumer configuration.
Config ConsumerConfig
state consumerState
// streams is a map of all registered streams and their handlers.
streams map[string]handler
isStarted bool
messageQueue chan message
errorCh chan error
infoCh chan string
@ -58,34 +54,40 @@ func NewRedisConsumer(rdb redis.UniversalClient, namespace string,
const queueCapacity = 500
const errorChCapacity = 64
const infoChCapacity = 64
const concurrency = 2
const processingTimeout = 5 * time.Minute
return &RedisConsumer{
rdb: rdb,
namespace: namespace,
groupName: groupName,
consumerName: consumerName,
streams: map[string]HandlerFunc{},
processingTimeout: processingTimeout,
concurrency: concurrency,
state: consumerStateSetup,
messageQueue: make(chan message, queueCapacity),
errorCh: make(chan error, errorChCapacity),
infoCh: make(chan string, infoChCapacity),
rdb: rdb,
namespace: namespace,
groupName: groupName,
consumerName: consumerName,
streams: map[string]handler{},
Config: defaultConfig,
isStarted: false,
messageQueue: make(chan message, queueCapacity),
errorCh: make(chan error, errorChCapacity),
infoCh: make(chan string, infoChCapacity),
}, nil
}
func (c *RedisConsumer) Register(streamID string, handler HandlerFunc) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
func (c *RedisConsumer) Configure(opts ...ConsumerOption) {
if c.isStarted {
return
}
for _, opt := range opts {
opt.apply(&c.Config)
}
}
func (c *RedisConsumer) Register(streamID string, fn HandlerFunc, opts ...HandlerOption) error {
if c.isStarted {
return ErrAlreadyStarted
}
if streamID == "" {
return errors.New("streamID can't be empty")
}
if handler == nil {
return errors.New("handler can't be empty")
if fn == nil {
return errors.New("fn can't be empty")
}
// transpose streamID to key namespace - no need to keep inner streamID
@ -94,55 +96,23 @@ func (c *RedisConsumer) Register(streamID string, handler HandlerFunc) error {
return fmt.Errorf("consumer is already registered for '%s' (redis stream '%s')", streamID, transposedStreamID)
}
c.streams[transposedStreamID] = handler
return nil
}
func (c *RedisConsumer) SetConcurrency(concurrency int) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
// create final config for handler
config := c.Config.DefaultHandlerConfig
for _, opt := range opts {
opt.apply(&config)
}
if concurrency < 1 || concurrency > MaxConcurrency {
return fmt.Errorf("concurrency has to be between 1 and %d (inclusive)", MaxConcurrency)
c.streams[transposedStreamID] = handler{
handle: fn,
config: config,
}
c.concurrency = concurrency
return nil
}
func (c *RedisConsumer) SetProcessingTimeout(timeout time.Duration) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
if timeout < MinProcessingTimeout {
return fmt.Errorf("processing timeout %v is too short, it has to be at least %v", timeout, MinProcessingTimeout)
}
c.processingTimeout = timeout
return nil
}
func (c *RedisConsumer) SetMaxRetryCount(retryCount int64) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
if retryCount < 1 || retryCount > MaxRetryCount {
return fmt.Errorf("max retry count has to be between 1 and %d (inclusive)", MaxRetryCount)
}
c.maxRetryCount = retryCount
return nil
}
func (c *RedisConsumer) Start(ctx context.Context) error {
if err := checkConsumerStateTransition(c.state, consumerStateStarted); err != nil {
return err
if c.isStarted {
return ErrAlreadyStarted
}
if len(c.streams) == 0 {
@ -163,8 +133,8 @@ func (c *RedisConsumer) Start(ctx context.Context) error {
return err
}
// update state to started before starting go routines (can't error out from here)
c.state = consumerStateStarted
// mark as started before starting go routines (can't error out from here)
c.isStarted = true
wg := &sync.WaitGroup{}
@ -179,11 +149,15 @@ func (c *RedisConsumer) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
// launch redis message reclaimer, it will finish when the ctx is done
c.reclaimer(ctx, time.Minute)
// launch redis message reclaimer, it will finish when the ctx is done.
// IMPORTANT: Keep reclaim interval small for now to support faster retries => higher load on redis!
// TODO: Make retries local by default with opt-in cross-instance retries.
// https://harness.atlassian.net/browse/SCM-83
const reclaimInterval = 10 * time.Second
c.reclaimer(ctx, reclaimInterval)
}()
for i := 0; i < c.concurrency; i++ {
for i := 0; i < c.Config.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
@ -196,9 +170,6 @@ func (c *RedisConsumer) Start(ctx context.Context) error {
// wait for all go routines to complete
wg.Wait()
// update state to finished
c.state = consumerStateFinished
// close all channels
close(c.messageQueue)
close(c.errorCh)
@ -372,31 +343,29 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura
case <-ctx.Done():
return
case <-reclaimTimer.C:
for streamID := range c.streams {
for streamID, handler := range c.streams {
resPending, errPending := c.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamID,
Group: c.groupName,
Start: start,
End: end,
Idle: c.processingTimeout,
Idle: handler.config.idleTimeout,
Count: int64(count),
}).Result()
if errPending != nil && !errors.Is(errPending, redis.Nil) {
c.pushError(fmt.Errorf("failed to fetch pending messages: %w", errPending))
reclaimTimer.Reset(reclaimInterval)
break
continue
}
if len(resPending) == 0 {
reclaimTimer.Reset(reclaimInterval)
break
continue
}
// It's safe to change start of the requested range for the next iteration to oldest message.
start = resPending[0].ID
for _, resMessage := range resPending {
if resMessage.RetryCount > c.maxRetryCount {
if resMessage.RetryCount > int64(handler.config.maxRetries) {
// Retry count gets increased after every XCLAIM.
// Large retry count might mean there is something wrong with the message, so we'll XACK it.
// WARNING this will discard the message!
@ -406,9 +375,10 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura
"failed to force acknowledge (discard) message '%s' (Retries: %d) in stream '%s': %w",
resMessage.ID, resMessage.RetryCount, streamID, errAck))
} else {
retryCount := resMessage.RetryCount - 1 // redis is counting this execution as retry
c.pushError(fmt.Errorf(
"force acknowledged (discarded) message '%s' (Retries: %d) in stream '%s'",
resMessage.ID, resMessage.RetryCount, streamID))
resMessage.ID, retryCount, streamID))
}
continue
}
@ -418,7 +388,7 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura
Stream: streamID,
Group: c.groupName,
Consumer: c.consumerName,
MinIdle: c.processingTimeout,
MinIdle: handler.config.idleTimeout,
Messages: []string{resMessage.ID},
}).Result()
@ -475,9 +445,9 @@ func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Dura
} else {
count = baseCount
}
reclaimTimer.Reset(reclaimInterval)
}
reclaimTimer.Reset(reclaimInterval)
}
}
}
@ -494,7 +464,7 @@ func (c *RedisConsumer) consumer(ctx context.Context) {
return
}
fn, ok := c.streams[m.streamID]
handler, ok := c.streams[m.streamID]
if !ok {
// we don't want to ack the message
// maybe someone else can claim and process it (worst case it expires)
@ -512,7 +482,7 @@ func (c *RedisConsumer) consumer(ctx context.Context) {
}
}()
return fn(ctx, m.id, m.values)
return handler.handle(ctx, m.id, m.values)
}()
if err != nil {
c.pushError(fmt.Errorf("failed to process message '%s' in stream '%s': %w", m.id, m.streamID, err))

View File

@ -6,24 +6,53 @@ package stream
import (
"context"
"errors"
"fmt"
"time"
)
const (
// MaxConcurrency is the max number of concurrent go routines (for message handling) for a single stream consumer.
MaxConcurrency = 64
var (
ErrAlreadyStarted = errors.New("consumer already started")
// MaxRetryCount is the max number of retries of an event for a single consumer group.
MaxRetryCount = 64
// MinProcessingTimeout is the minumum time that can be configured as processing timeout for a stream consumer.
MinProcessingTimeout = 1 * time.Minute
defaultConfig = ConsumerConfig{
Concurrency: 2,
DefaultHandlerConfig: HandlerConfig{
idleTimeout: 1 * time.Minute,
maxRetries: 2,
},
}
)
// ConsumerConfig defines the configuration of a consumer containing externally exposed values
// that can be configured using the available ConsumerOptions.
type ConsumerConfig struct {
// Concurrency specifies the number of worker go routines executing stream handlers.
Concurrency int
// DefaultHandlerConfig is the default config used for stream handlers.
DefaultHandlerConfig HandlerConfig
}
// HandlerConfig defines the configuration for a single stream handler containing externally exposed values
// that can be configured using the available HandlerOptions.
type HandlerConfig struct {
// idleTimeout specifies the maximum duration a message stays read but unacknowleged
// before it can be claimed by others.
idleTimeout time.Duration
// maxRetries specifies the max number a stream message is retried.
maxRetries int
}
// HandlerFunc defines the signature of a function handling stream messages.
type HandlerFunc func(ctx context.Context, messageID string, payload map[string]interface{}) error
// handler defines a handler of a single stream.
type handler struct {
handle HandlerFunc
config HandlerConfig
}
// message is used internally for passing stream messages via channels.
type message struct {
streamID string
@ -35,33 +64,3 @@ type message struct {
func transposeStreamID(namespace string, streamID string) string {
return fmt.Sprintf("%s:%s", namespace, streamID)
}
// consumerState specifies the different states of a consumer.
type consumerState string
const (
// consumerStateSetup defines the state in which the consumer is being setup (Register, SetConcurrency, ...).
// In other words, it's the state before the consumer was started.
consumerStateSetup consumerState = "setup"
// consumerStateStarted defines the state after the consumer was started (context not yet canceled).
consumerStateStarted consumerState = "started"
// consumerStateFinished defines the state after the consumer has been stopped (context canceled).
consumerStateFinished consumerState = "finished"
)
// checkConsumerStateTransition returns an error in case the state transition is not allowed, nil otherwise.
// It is used to avoid that invalid operations are being executed in a given state (e.g. Register(...) when started).
func checkConsumerStateTransition(current, updated consumerState) error {
switch {
case current == consumerStateSetup && updated == consumerStateSetup:
return nil
case current == consumerStateSetup && updated == consumerStateStarted:
return nil
case current == consumerStateStarted && updated == consumerStateFinished:
return nil
default:
return fmt.Errorf("consumer state transition from '%s' to '%s' is not possible", current, updated)
}
}