mirror of
https://github.com/harness/drone.git
synced 2025-05-04 19:51:52 +08:00
600 lines
18 KiB
Go
600 lines
18 KiB
Go
// Copyright 2022 Harness Inc. All rights reserved.
|
|
// Use of this source code is governed by the Polyform Free Trial License
|
|
// that can be found in the LICENSE.md file for this repository.
|
|
|
|
package stream
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
// RedisConsumer provides functionality to process Redis streams as part of a consumer group.
|
|
type RedisConsumer struct {
|
|
rdb redis.UniversalClient
|
|
// namespace specifies the namespace of the keys - any stream key will be prefixed with it
|
|
namespace string
|
|
// groupName specifies the name of the consumer group
|
|
groupName string
|
|
// consumerName specifies the name of the consumer
|
|
consumerName string
|
|
// processingTimeout specifies the maximum duration a message stays read but unacknowleged
|
|
// before it can be claimed by others.
|
|
processingTimeout time.Duration
|
|
|
|
// streams is a map of all streams to consum and their handler function
|
|
streams map[string]HandlerFunc
|
|
// concurrency specifies the number of worker go routines
|
|
concurrency int
|
|
// maxRetryCount specifies the max number an event is retried
|
|
maxRetryCount int64
|
|
|
|
state consumerState
|
|
messageQueue chan message
|
|
errorCh chan error
|
|
infoCh chan string
|
|
}
|
|
|
|
// NewRedisConsumer creates new Redis stream consumer. Streams are read with XREADGROUP.
|
|
// It returns channels of info messages and errors. The caller should not block on these channels for too long.
|
|
// These channels are provided mainly for logging.
|
|
func NewRedisConsumer(rdb redis.UniversalClient, namespace string,
|
|
groupName string, consumerName string) (*RedisConsumer, error) {
|
|
if groupName == "" {
|
|
return nil, errors.New("groupName can't be empty")
|
|
}
|
|
if consumerName == "" {
|
|
return nil, errors.New("consumerName can't be empty")
|
|
}
|
|
|
|
const queueCapacity = 500
|
|
const errorChCapacity = 64
|
|
const infoChCapacity = 64
|
|
const concurrency = 2
|
|
const processingTimeout = 5 * time.Minute
|
|
|
|
return &RedisConsumer{
|
|
rdb: rdb,
|
|
namespace: namespace,
|
|
groupName: groupName,
|
|
consumerName: consumerName,
|
|
streams: map[string]HandlerFunc{},
|
|
processingTimeout: processingTimeout,
|
|
concurrency: concurrency,
|
|
state: consumerStateSetup,
|
|
messageQueue: make(chan message, queueCapacity),
|
|
errorCh: make(chan error, errorChCapacity),
|
|
infoCh: make(chan string, infoChCapacity),
|
|
}, nil
|
|
}
|
|
|
|
func (c *RedisConsumer) Register(streamID string, handler HandlerFunc) error {
|
|
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
|
|
return err
|
|
}
|
|
|
|
if streamID == "" {
|
|
return errors.New("streamID can't be empty")
|
|
}
|
|
if handler == nil {
|
|
return errors.New("handler can't be empty")
|
|
}
|
|
|
|
// transpose streamID to key namespace - no need to keep inner streamID
|
|
transposedStreamID := transposeStreamID(c.namespace, streamID)
|
|
if _, ok := c.streams[transposedStreamID]; ok {
|
|
return fmt.Errorf("consumer is already registered for '%s' (redis stream '%s')", streamID, transposedStreamID)
|
|
}
|
|
|
|
c.streams[transposedStreamID] = handler
|
|
return nil
|
|
}
|
|
|
|
func (c *RedisConsumer) SetConcurrency(concurrency int) error {
|
|
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
|
|
return err
|
|
}
|
|
|
|
if concurrency < 1 || concurrency > MaxConcurrency {
|
|
return fmt.Errorf("concurrency has to be between 1 and %d (inclusive)", MaxConcurrency)
|
|
}
|
|
|
|
c.concurrency = concurrency
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RedisConsumer) SetProcessingTimeout(timeout time.Duration) error {
|
|
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
|
|
return err
|
|
}
|
|
|
|
if timeout < MinProcessingTimeout {
|
|
return fmt.Errorf("processing timeout %v is too short, it has to be at least %v", timeout, MinProcessingTimeout)
|
|
}
|
|
|
|
c.processingTimeout = timeout
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RedisConsumer) SetMaxRetryCount(retryCount int64) error {
|
|
if err := checkConsumerStateTransition(c.state, consumerStateSetup); err != nil {
|
|
return err
|
|
}
|
|
|
|
if retryCount < 1 || retryCount > MaxRetryCount {
|
|
return fmt.Errorf("max retry count has to be between 1 and %d (inclusive)", MaxRetryCount)
|
|
}
|
|
|
|
c.maxRetryCount = retryCount
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RedisConsumer) Start(ctx context.Context) error {
|
|
if err := checkConsumerStateTransition(c.state, consumerStateStarted); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(c.streams) == 0 {
|
|
return errors.New("no streams registered")
|
|
}
|
|
|
|
var err error
|
|
|
|
// Check if Redis is accessible, fail if it's not.
|
|
err = c.rdb.Ping(ctx).Err()
|
|
if err != nil && !errors.Is(err, redis.Nil) {
|
|
return fmt.Errorf("failed to ping redis server: %w", err)
|
|
}
|
|
|
|
// Create consumer group for all streams, creates streams if they don't exist.
|
|
err = c.createGroupForAllStreams(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// update state to started before starting go routines (can't error out from here)
|
|
c.state = consumerStateStarted
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
c.removeStaleConsumers(ctx, time.Hour)
|
|
// launch redis reader, it will finish when the ctx is done
|
|
c.reader(ctx)
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
// launch redis message reclaimer, it will finish when the ctx is done
|
|
c.reclaimer(ctx, time.Minute)
|
|
}()
|
|
|
|
for i := 0; i < c.concurrency; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
// launch redis message consumer, it will finish when the ctx is done
|
|
c.consumer(ctx)
|
|
}()
|
|
}
|
|
|
|
go func() {
|
|
// wait for all go routines to complete
|
|
wg.Wait()
|
|
|
|
// update state to finished
|
|
c.state = consumerStateFinished
|
|
|
|
// close all channels
|
|
close(c.messageQueue)
|
|
close(c.errorCh)
|
|
close(c.infoCh)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// reader method reads a Redis stream with XREADGROUP command to retrieve messages.
|
|
// The messages are then sent to a go channel passed as parameter for processing.
|
|
// If the stream already contains unassigned messages, those we'll be returned.
|
|
// Otherwise XREADGROUP blocks until either a new message arrives or block timeout happens.
|
|
// The method terminates when the provided context finishes.
|
|
//
|
|
//nolint:funlen,gocognit // refactor if needed
|
|
func (c *RedisConsumer) reader(ctx context.Context) {
|
|
delays := []time.Duration{1 * time.Millisecond, 5 * time.Second, 15 * time.Second, 30 * time.Second, time.Minute}
|
|
consecutiveFailures := 0
|
|
|
|
// pre-generate streams argument for XReadGroup
|
|
// NOTE: for the first call ever we want to get the history of the consumer (to allow for seamless restarts)
|
|
// ASSUMPTION: only one consumer with a given groupName+consumerName is running at a time
|
|
scanHistory := true
|
|
streamLen := len(c.streams)
|
|
streamsArg := make([]string, 2*streamLen)
|
|
i := 0
|
|
for streamID := range c.streams {
|
|
streamsArg[i] = streamID
|
|
streamsArg[streamLen+i] = "0"
|
|
i++
|
|
}
|
|
|
|
for {
|
|
var delay time.Duration
|
|
if consecutiveFailures < len(delays) {
|
|
delay = delays[consecutiveFailures]
|
|
} else {
|
|
delay = delays[len(delays)-1]
|
|
}
|
|
readTimer := time.NewTimer(delay)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
readTimer.Stop()
|
|
return
|
|
|
|
case <-readTimer.C:
|
|
const count = 100
|
|
|
|
resReadStream, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
|
Group: c.groupName,
|
|
Consumer: c.consumerName,
|
|
Streams: streamsArg,
|
|
Count: count,
|
|
Block: 5 * time.Minute,
|
|
}).Result()
|
|
|
|
// if context is canceled, continue and next iteration will exit cleanly
|
|
if errors.Is(err, context.Canceled) {
|
|
continue
|
|
}
|
|
|
|
// network timeout - log it and retry
|
|
var errNet net.Error
|
|
if ok := errors.As(err, &errNet); ok && errNet.Timeout() {
|
|
c.pushError(fmt.Errorf("encountered network failure: %w", errNet))
|
|
consecutiveFailures++
|
|
continue
|
|
}
|
|
|
|
// group doesn't exist anymore - recreate it
|
|
if err != nil && strings.HasPrefix(err.Error(), "NOGROUP") {
|
|
cErr := c.createGroupForAllStreams(ctx)
|
|
if cErr != nil {
|
|
c.pushError(fmt.Errorf("failed to re-create group for at least one stream: %w", err))
|
|
consecutiveFailures++
|
|
} else {
|
|
c.pushInfo(fmt.Sprintf("re-created group for all streams where it got removed, original error: %s",
|
|
err))
|
|
consecutiveFailures = 0
|
|
}
|
|
continue
|
|
}
|
|
|
|
// any other error we handle generically
|
|
if err != nil && !errors.Is(err, redis.Nil) {
|
|
consecutiveFailures++
|
|
c.pushError(fmt.Errorf("failed to read redis streams %v (consecutive fails: %d): %w",
|
|
streamsArg, consecutiveFailures, err))
|
|
continue
|
|
}
|
|
|
|
// check if we are done with scanning the history of all streams
|
|
if scanHistory {
|
|
scanHistory = false
|
|
|
|
// Getting history always returns all streams in the same order as queried
|
|
// (even a stream that doesn't have any history left, in that case redis returns an empty slice)
|
|
// Thus, we can use a simple incrementing index to get the streamArg for a stream in the response
|
|
x := 0
|
|
for _, stream := range resReadStream {
|
|
// If the stream had messages in the history, continue scanning after the latest read message.
|
|
if len(stream.Messages) > 0 {
|
|
scanHistory = true
|
|
streamsArg[streamLen+x] = stream.Messages[len(stream.Messages)-1].ID
|
|
}
|
|
x++
|
|
}
|
|
|
|
if !scanHistory {
|
|
c.pushInfo("completed scan of history")
|
|
|
|
// Update stream args to read latest messages for all streams
|
|
for j := 0; j < streamLen; j++ {
|
|
streamsArg[streamLen+j] = ">"
|
|
}
|
|
|
|
continue
|
|
}
|
|
}
|
|
|
|
// reset fail count
|
|
consecutiveFailures = 0
|
|
|
|
// if no messages were read we can skip iteration
|
|
if len(resReadStream) == 0 {
|
|
continue
|
|
}
|
|
|
|
// retrieve all messages across all streams and put them into the message queue
|
|
for _, stream := range resReadStream {
|
|
for _, m := range stream.Messages {
|
|
c.messageQueue <- message{
|
|
streamID: stream.Stream,
|
|
id: m.ID,
|
|
values: m.Values,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reclaimer periodically inspects pending messages with XPENDING command.
|
|
// If a message sits longer than processingTimeout, we attempt to reclaim the message for this consumer
|
|
// and enqueue it for processing.
|
|
//
|
|
//nolint:funlen,gocognit // refactor if needed
|
|
func (c *RedisConsumer) reclaimer(ctx context.Context, reclaimInterval time.Duration) {
|
|
reclaimTimer := time.NewTimer(reclaimInterval)
|
|
defer func() {
|
|
reclaimTimer.Stop()
|
|
}()
|
|
|
|
const (
|
|
baseCount = 16
|
|
maxCount = 1024
|
|
)
|
|
|
|
// the minimum message ID which we are querying for.
|
|
// redis treats "-" as smaller than any valid message ID
|
|
start := "-"
|
|
// the maximum message ID which we are querying for.
|
|
// redis treats "+" as bigger than any valid message ID
|
|
end := "+"
|
|
count := baseCount
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-reclaimTimer.C:
|
|
for streamID := range c.streams {
|
|
resPending, errPending := c.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
|
|
Stream: streamID,
|
|
Group: c.groupName,
|
|
Start: start,
|
|
End: end,
|
|
Idle: c.processingTimeout,
|
|
Count: int64(count),
|
|
}).Result()
|
|
if errPending != nil && !errors.Is(errPending, redis.Nil) {
|
|
c.pushError(fmt.Errorf("failed to fetch pending messages: %w", errPending))
|
|
reclaimTimer.Reset(reclaimInterval)
|
|
break
|
|
}
|
|
|
|
if len(resPending) == 0 {
|
|
reclaimTimer.Reset(reclaimInterval)
|
|
break
|
|
}
|
|
|
|
// It's safe to change start of the requested range for the next iteration to oldest message.
|
|
start = resPending[0].ID
|
|
|
|
for _, resMessage := range resPending {
|
|
if resMessage.RetryCount > c.maxRetryCount {
|
|
// Retry count gets increased after every XCLAIM.
|
|
// Large retry count might mean there is something wrong with the message, so we'll XACK it.
|
|
// WARNING this will discard the message!
|
|
errAck := c.rdb.XAck(ctx, streamID, c.groupName, resMessage.ID).Err()
|
|
if errAck != nil {
|
|
c.pushError(fmt.Errorf(
|
|
"failed to force acknowledge (discard) message '%s' (Retries: %d) in stream '%s': %w",
|
|
resMessage.ID, resMessage.RetryCount, streamID, errAck))
|
|
} else {
|
|
c.pushError(fmt.Errorf(
|
|
"force acknowledged (discarded) message '%s' (Retries: %d) in stream '%s'",
|
|
resMessage.ID, resMessage.RetryCount, streamID))
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Otherwise, claim the message so we can retry it.
|
|
claimedMessages, errClaim := c.rdb.XClaim(ctx, &redis.XClaimArgs{
|
|
Stream: streamID,
|
|
Group: c.groupName,
|
|
Consumer: c.consumerName,
|
|
MinIdle: c.processingTimeout,
|
|
Messages: []string{resMessage.ID},
|
|
}).Result()
|
|
|
|
if errors.Is(errClaim, redis.Nil) {
|
|
// Receiving redis.Nil here means the message is removed from the stream (because of MAXLEN).
|
|
// The only option is to acknowledge it with XACK.
|
|
errAck := c.rdb.XAck(ctx, streamID, c.groupName, resMessage.ID).Err()
|
|
if errAck != nil {
|
|
c.pushError(fmt.Errorf("failed to acknowledge failed message '%s' in stream '%s': %w",
|
|
resMessage.ID, streamID, errAck))
|
|
} else {
|
|
c.pushInfo(fmt.Sprintf("acknowledged failed message '%s' in stream '%s'",
|
|
resMessage.ID, streamID))
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if errClaim != nil {
|
|
// This can happen if two consumers try to claim the same message at once.
|
|
// One would succeed and the other will get an error.
|
|
c.pushError(fmt.Errorf("failed to claim message '%s' in stream '%s': %w",
|
|
resMessage.ID, streamID, errClaim))
|
|
|
|
continue
|
|
}
|
|
|
|
// This is not expected to happen (message will be retried or eventually discarded)
|
|
if len(claimedMessages) == 0 {
|
|
c.pushError(fmt.Errorf(
|
|
"no error when claiming message '%s' in stream '%s', but redis returned no message",
|
|
resMessage.ID, streamID))
|
|
|
|
continue
|
|
}
|
|
|
|
// we claimed only one message id so there is only one message in the slice
|
|
claimedMessage := claimedMessages[0]
|
|
c.messageQueue <- message{
|
|
streamID: streamID,
|
|
id: claimedMessage.ID,
|
|
values: claimedMessage.Values,
|
|
}
|
|
}
|
|
|
|
// If number of messages that we got is equal to the number that we requested
|
|
// it means that there's a lot for processing, so we'll increase number of messages
|
|
// that we'll pull in the next iteration.
|
|
if len(resPending) == count {
|
|
count *= 2
|
|
if count > maxCount {
|
|
count = maxCount
|
|
}
|
|
} else {
|
|
count = baseCount
|
|
}
|
|
|
|
reclaimTimer.Reset(reclaimInterval)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// consumer method consumes messages coming from Redis. The method terminates when messageQueue channel closes.
|
|
func (c *RedisConsumer) consumer(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case m := <-c.messageQueue:
|
|
if m.id == "" {
|
|
// id should never be empty, if it is then the channel is closed
|
|
return
|
|
}
|
|
|
|
fn, ok := c.streams[m.streamID]
|
|
if !ok {
|
|
// we don't want to ack the message
|
|
// maybe someone else can claim and process it (worst case it expires)
|
|
c.pushError(fmt.Errorf("received message '%s' in stream '%s' that doesn't belong to us, skip",
|
|
m.id, m.streamID))
|
|
continue
|
|
}
|
|
|
|
err := func() (err error) {
|
|
// Ensure that handlers don't cause panic.
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
c.pushError(fmt.Errorf("PANIC when processing message '%s' in stream '%s':\n%s",
|
|
m.id, m.streamID, debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
return fn(ctx, m.id, m.values)
|
|
}()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf("failed to process message '%s' in stream '%s': %w", m.id, m.streamID, err))
|
|
continue
|
|
}
|
|
|
|
err = c.rdb.XAck(ctx, m.streamID, c.groupName, m.id).Err()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf("failed to acknowledge message '%s' in stream '%s': %w", m.id, m.streamID, err))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) removeStaleConsumers(ctx context.Context, maxAge time.Duration) {
|
|
for streamID := range c.streams {
|
|
// Fetch all consumers for this stream and group.
|
|
resConsumers, err := c.rdb.XInfoConsumers(ctx, streamID, c.groupName).Result()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf("failed to read consumers for stream '%s': %w", streamID, err))
|
|
return
|
|
}
|
|
|
|
// Delete old consumers, but only if they don't have pending messages.
|
|
for _, resConsumer := range resConsumers {
|
|
age := time.Duration(resConsumer.Idle) * time.Millisecond
|
|
if resConsumer.Pending > 0 || age < maxAge {
|
|
continue
|
|
}
|
|
|
|
err = c.rdb.XGroupDelConsumer(ctx, streamID, c.groupName, resConsumer.Name).Err()
|
|
if err != nil {
|
|
c.pushError(fmt.Errorf(
|
|
"failed to remove stale consumer '%s' from group '%s' (age '%s') for stream '%s': %w",
|
|
resConsumer.Name, c.groupName, age, streamID, err))
|
|
continue
|
|
}
|
|
|
|
c.pushInfo(fmt.Sprintf("removed stale consumer '%s' from group '%s' (age '%s') for stream '%s'",
|
|
resConsumer.Name, c.groupName, age, streamID))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) pushError(err error) {
|
|
select {
|
|
case c.errorCh <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) pushInfo(s string) {
|
|
select {
|
|
case c.infoCh <- s:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (c *RedisConsumer) Errors() <-chan error { return c.errorCh }
|
|
func (c *RedisConsumer) Infos() <-chan string { return c.infoCh }
|
|
|
|
func (c *RedisConsumer) createGroupForAllStreams(ctx context.Context) error {
|
|
for streamID := range c.streams {
|
|
err := createGroup(ctx, c.rdb, streamID, c.groupName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createGroup(ctx context.Context, rdb redis.UniversalClient, streamID string, groupName string) error {
|
|
// Creates a new consumer group that starts receiving messages from now on.
|
|
// Existing messges in the queue are ignored (we don't want to overload a group with old messages)
|
|
// For more details of the XGROUPCREATE api see https://redis.io/commands/xgroup-create/
|
|
err := rdb.XGroupCreateMkStream(ctx, streamID, groupName, "$").Err()
|
|
if err != nil && !strings.HasPrefix(err.Error(), "BUSYGROUP") {
|
|
return fmt.Errorf("failed to create consumer group '%s' for stream '%s': %w", groupName, streamID, err)
|
|
}
|
|
|
|
return nil
|
|
}
|