fix: [CDE-484]: Changing the behaviour of the inmemory stream consumer to retry the message after timeout. (#2993)

* fix: [CDE-484]: Addressing review comments.
* fix: [CDE-484]: Changing the behaviour of the inmemory stream consumer to retry the message after timeout.
This commit is contained in:
Dhruv Dhruv 2024-11-18 05:33:40 +00:00 committed by Harness
parent 17dea68b57
commit e7ce1cd7b5

View File

@ -187,7 +187,27 @@ func (c *MemoryConsumer) consume(ctx context.Context) {
continue continue
} }
err := func() (err error) { c.processMessage(ctx, handler, m)
}
}
}
func (c *MemoryConsumer) processMessage(ctx context.Context, handler handler, m memoryMessage) {
var handlingErr error
ctxWithCancel, cancelFn := context.WithCancel(ctx)
defer func(err error) {
// If the original execution errors out, we rely on the timeout to retry. This is to keep the behaviour same
// as the redis consumer.
if err == nil {
cancelFn()
}
}(handlingErr)
// Start a retry goroutine with `idleTimeout` delay
go c.retryPostTimeout(ctxWithCancel, handler, m)
handlingErr = func() (err error) {
// Ensure that handlers don't cause panic. // Ensure that handlers don't cause panic.
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -199,15 +219,32 @@ func (c *MemoryConsumer) consume(ctx context.Context) {
return handler.handle(ctx, m.id, m.values) return handler.handle(ctx, m.id, m.values)
}() }()
if err != nil { if handlingErr != nil {
c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w", c.pushError(fmt.Errorf("failed to process message with id '%s' in stream '%s' (retries: %d): %w",
m.id, m.streamID, m.retries, err)) m.id, m.streamID, m.retries, handlingErr))
}
}
if m.retries >= int64(handler.config.maxRetries) { func (c *MemoryConsumer) retryPostTimeout(ctxWithCancel context.Context, handler handler, m memoryMessage) {
c.pushError(fmt.Errorf( timer := time.NewTimer(handler.config.idleTimeout)
"discard message with id '%s' from stream '%s' - failed %d retries", defer timer.Stop()
select {
case <-timer.C:
c.retryMessage(m, handler.config.maxRetries)
case <-ctxWithCancel.Done():
// Retry canceled if message is processed
// Drain the timer channel if it is already stopped
if !timer.Stop() {
<-timer.C
}
}
}
func (c *MemoryConsumer) retryMessage(m memoryMessage, maxRetries int) {
if m.retries >= int64(maxRetries) {
c.pushError(fmt.Errorf("discard message with id '%s' from stream '%s' - failed %d retries",
m.id, m.streamID, m.retries)) m.id, m.streamID, m.retries))
continue return
} }
// increase retry count // increase retry count
@ -217,13 +254,9 @@ func (c *MemoryConsumer) consume(ctx context.Context) {
// IMPORTANT: this won't requeue to broker, only in this consumer's queue! // IMPORTANT: this won't requeue to broker, only in this consumer's queue!
go func() { go func() {
// TODO: linear/exponential backoff relative to retry count might be good // TODO: linear/exponential backoff relative to retry count might be good
time.Sleep(handler.config.idleTimeout)
c.messageQueue <- m c.messageQueue <- m
}() }()
} }
}
}
}
func (c *MemoryConsumer) Errors() <-chan error { return c.errorCh } func (c *MemoryConsumer) Errors() <-chan error { return c.errorCh }
func (c *MemoryConsumer) Infos() <-chan string { return c.infoCh } func (c *MemoryConsumer) Infos() <-chan string { return c.infoCh }