drone/stream/memory_consumer.go
Johannes Batzill 1f1118f624 [Webhook] Add Event Processing and Retrigger (#148)
This change includes:
- webhook event processor (includes actual webhook execution)
- webhook retrigger API allowing to retrigger previous webhook executions
2023-01-04 14:43:36 -08:00

243 lines
6.0 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package stream
import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
)
// memoryMessage extends the message object to allow tracking retries.
type memoryMessage struct {
message
retries int64
}
// MemoryConsumer consumes streams from a MemoryBroker.
type MemoryConsumer struct {
broker *MemoryBroker
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
namespace string
concurrency int
maxRetryCount int64
groupName string
streams map[string]HandlerFunc
state consumerState
messageQueue chan memoryMessage
errorCh chan error
infoCh chan string
}
func NewMemoryConsumer(broker *MemoryBroker, namespace string, groupName string) *MemoryConsumer {
const queueCapacity = 500
const errorChCapacity = 64
const infoChCapacity = 64
const concurrency = 1
return &MemoryConsumer{
broker: broker,
namespace: namespace,
concurrency: concurrency,
groupName: groupName,
streams: make(map[string]HandlerFunc),
state: consumerStateSetup,
messageQueue: make(chan memoryMessage, queueCapacity),
errorCh: make(chan error, errorChCapacity),
infoCh: make(chan string, infoChCapacity),
}
}
func (c *MemoryConsumer) Register(streamID string, handler HandlerFunc) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
if streamID == "" {
return errors.New("streamID can't be empty")
}
if handler == nil {
return errors.New("handler can't be empty")
}
// transpose streamID to key namespace - no need to keep inner streamID
transposedStreamID := transposeStreamID(c.namespace, streamID)
if _, ok := c.streams[transposedStreamID]; ok {
return fmt.Errorf("consumer is already registered for '%s' (full stream '%s')", streamID, transposedStreamID)
}
c.streams[transposedStreamID] = handler
return nil
}
func (c *MemoryConsumer) SetConcurrency(concurrency int) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
if concurrency < 1 || concurrency > MaxConcurrency {
return fmt.Errorf("concurrency has to be between 1 and %d (inclusive)", MaxConcurrency)
}
c.concurrency = concurrency
return nil
}
func (c *MemoryConsumer) SetMaxRetryCount(retryCount int64) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
if retryCount < 1 || retryCount > MaxRetryCount {
return fmt.Errorf("max retry count has to be between 1 and %d (inclusive)", MaxRetryCount)
}
c.maxRetryCount = retryCount
return nil
}
func (c *MemoryConsumer) SetProcessingTimeout(timeout time.Duration) error {
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
return err
}
// we don't have an idle timeout for this implementation
return nil
}
func (c *MemoryConsumer) Start(ctx context.Context) error {
if err := checkConsumerStateTransition(c.state, consumerStateStarted); err != nil {
return err
}
if len(c.streams) == 0 {
return errors.New("no streams registered")
}
// update state to started before starting go routines (can't error out from here)
c.state = consumerStateStarted
wg := &sync.WaitGroup{}
// start routines to read messages from broker
for streamID := range c.streams {
wg.Add(1)
go func(stream string) {
defer wg.Done()
c.reader(ctx, stream)
}(streamID)
}
// start workers
for i := 0; i < c.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.consume(ctx)
}()
}
// start cleanup routing
go func() {
// wait for all go routines to complete
wg.Wait()
// update state to finished
c.state = consumerStateFinished
close(c.messageQueue)
close(c.infoCh)
close(c.errorCh)
}()
return nil
}
// reader reads the messages of a specific stream from the broker and puts it
// into the single message queue monitored by the consumers.
func (c *MemoryConsumer) reader(ctx context.Context, streamID string) {
streamQueue := c.broker.messages(streamID, c.groupName)
for {
select {
case <-ctx.Done():
return
case m := <-streamQueue:
c.messageQueue <- memoryMessage{
message: m,
retries: 0,
}
}
}
}
func (c *MemoryConsumer) consume(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case m := <-c.messageQueue:
fn, ok := c.streams[m.streamID]
if !ok {
// we only take messages from registered streams, this should never happen.
// WARNING this will discard the message
c.pushError(fmt.Errorf("discard message with id '%s' from stream '%s' - doesn't belong to us",
m.id, m.streamID))
continue
}
err := func() (err error) {
// Ensure that handlers don't cause panic.
defer func() {
if r := recover(); r != nil {
c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s",
m.id, m.streamID, debug.Stack()))
}
}()
return fn(ctx, m.id, m.values)
}()
if err != nil {
c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w",
m.id, m.streamID, m.retries, err))
if m.retries >= c.maxRetryCount {
c.pushError(fmt.Errorf(
"discard message with id '%s' from stream '%s' - failed %d retries",
m.id, m.streamID, m.retries))
continue
}
// increase retry count
m.retries++
// requeue message for a retry (needs to be in a separate go func to avoid deadlock)
// IMPORTANT: this won't requeue to broker, only in this consumer's queue!
go func() {
// TODO: linear/exponential backoff relative to retry count might be good
time.Sleep(5 * time.Second)
c.messageQueue <- m
}()
}
}
}
}
func (c *MemoryConsumer) Errors() <-chan error { return c.errorCh }
func (c *MemoryConsumer) Infos() <-chan string { return c.infoCh }
func (c *MemoryConsumer) pushError(err error) {
select {
case c.errorCh <- err:
default:
}
}