mirror of
https://github.com/harness/drone.git
synced 2025-05-03 22:32:45 +08:00
retry improved
This commit is contained in:
parent
209475397b
commit
b15054ea76
@ -139,58 +139,70 @@ type inMemMutex struct {
|
||||
}
|
||||
|
||||
// Key returns the key to be locked.
|
||||
func (l *inMemMutex) Key() string {
|
||||
return l.key
|
||||
func (m *inMemMutex) Key() string {
|
||||
return m.key
|
||||
}
|
||||
|
||||
// Lock acquires the lock. It fails with error if the lock is already held.
|
||||
func (l *inMemMutex) Lock(ctx context.Context) error {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
func (m *inMemMutex) Lock(ctx context.Context) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if l.isHeld {
|
||||
return NewError(LockHeld, l.key, nil)
|
||||
if m.isHeld {
|
||||
return NewError(LockHeld, m.key, nil)
|
||||
}
|
||||
|
||||
if l.provider.acquire(l.key, l.token, l.expiry) {
|
||||
l.isHeld = true
|
||||
if m.provider.acquire(m.key, m.token, m.expiry) {
|
||||
m.isHeld = true
|
||||
return nil
|
||||
}
|
||||
|
||||
timeout := time.NewTimer(l.waitTime)
|
||||
timeout := time.NewTimer(m.waitTime)
|
||||
defer timeout.Stop()
|
||||
|
||||
delayTimer := time.NewTimer(time.Hour)
|
||||
delayTimer.Stop()
|
||||
defer delayTimer.Stop()
|
||||
|
||||
for i := 1; i <= l.tries; i++ {
|
||||
delayTimer.Reset(l.delayFunc(i))
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return NewError(Context, l.key, ctx.Err())
|
||||
case <-timeout.C:
|
||||
return NewError(CannotLock, l.key, nil)
|
||||
case <-delayTimer.C:
|
||||
if l.provider.acquire(l.key, l.token, l.expiry) {
|
||||
l.isHeld = true
|
||||
return nil
|
||||
}
|
||||
for i := 1; !m.isHeld && i <= m.tries; i++ {
|
||||
if err := m.retry(ctx, i, timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return NewError(MaxRetriesExceeded, l.key, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *inMemMutex) retry(ctx context.Context, attempt int, timeout *time.Timer) error {
|
||||
if m.isHeld {
|
||||
return nil
|
||||
}
|
||||
if attempt == m.tries {
|
||||
return NewError(MaxRetriesExceeded, m.key, nil)
|
||||
}
|
||||
|
||||
delay := time.NewTimer(m.delayFunc(attempt))
|
||||
defer delay.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return NewError(Context, m.key, ctx.Err())
|
||||
case <-timeout.C:
|
||||
return NewError(CannotLock, m.key, nil)
|
||||
case <-delay.C: // just wait
|
||||
}
|
||||
|
||||
if m.provider.acquire(m.key, m.token, m.expiry) {
|
||||
m.isHeld = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unlock releases the lock. It fails with error if the lock is not currently held.
|
||||
func (l *inMemMutex) Unlock(_ context.Context) error {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
func (m *inMemMutex) Unlock(_ context.Context) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if !l.isHeld || !l.provider.release(l.key, l.token) {
|
||||
return NewError(LockNotHeld, l.key, nil)
|
||||
if !m.isHeld || !m.provider.release(m.key, m.token) {
|
||||
return NewError(LockNotHeld, m.key, nil)
|
||||
}
|
||||
|
||||
l.isHeld = false
|
||||
m.isHeld = false
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ package lock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -31,6 +32,7 @@ func Test_inMemMutex_Lock(t *testing.T) {
|
||||
}
|
||||
if err := mx.Lock(context.Background()); err != nil {
|
||||
t.Errorf("error from go routine while locking %s, err: %v", mx.Key(), err)
|
||||
return
|
||||
}
|
||||
mx.Unlock(context.Background())
|
||||
}()
|
||||
@ -48,6 +50,54 @@ func Test_inMemMutex_Lock(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func Test_inMemMutex_MaxTries(t *testing.T) {
|
||||
manager := NewInMemory(Config{
|
||||
App: "gitness",
|
||||
Namespace: "pullreq",
|
||||
Expiry: 1 * time.Second,
|
||||
Tries: 2,
|
||||
RetryDelay: 300 * time.Millisecond,
|
||||
})
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
mx, err := manager.NewMutex("key1")
|
||||
if err != nil {
|
||||
t.Errorf("mutex not created, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = mx.Lock(context.Background())
|
||||
if err == nil {
|
||||
t.Errorf("error should be returned while locking %s instead of nil", mx.Key())
|
||||
return
|
||||
}
|
||||
var errLock *Error
|
||||
if !errors.As(err, &errLock) {
|
||||
t.Errorf("expected error lock.Error, got: %v", err)
|
||||
return
|
||||
}
|
||||
if errLock.Kind != MaxRetriesExceeded {
|
||||
t.Errorf("expected lock.MaxRetriesExceeded, got: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
mx, err := manager.NewMutex("key1")
|
||||
if err != nil {
|
||||
t.Errorf("mutex not created, err: %v", err)
|
||||
return
|
||||
}
|
||||
if err := mx.Lock(context.Background()); err != nil {
|
||||
t.Errorf("error while locking %v, err: %v", mx.Key(), err)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
mx.Unlock(context.Background())
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func Test_inMemMutex_LockAndWait(t *testing.T) {
|
||||
wg := &sync.WaitGroup{}
|
||||
manager := NewInMemory(Config{
|
||||
|
Loading…
Reference in New Issue
Block a user