diff --git a/lock/memory.go b/lock/memory.go index e7e9ef2bf..8c4c449a3 100644 --- a/lock/memory.go +++ b/lock/memory.go @@ -139,58 +139,70 @@ type inMemMutex struct { } // Key returns the key to be locked. -func (l *inMemMutex) Key() string { - return l.key +func (m *inMemMutex) Key() string { + return m.key } // Lock acquires the lock. It fails with error if the lock is already held. -func (l *inMemMutex) Lock(ctx context.Context) error { - l.mutex.Lock() - defer l.mutex.Unlock() +func (m *inMemMutex) Lock(ctx context.Context) error { + m.mutex.Lock() + defer m.mutex.Unlock() - if l.isHeld { - return NewError(LockHeld, l.key, nil) + if m.isHeld { + return NewError(LockHeld, m.key, nil) } - if l.provider.acquire(l.key, l.token, l.expiry) { - l.isHeld = true + if m.provider.acquire(m.key, m.token, m.expiry) { + m.isHeld = true return nil } - timeout := time.NewTimer(l.waitTime) + timeout := time.NewTimer(m.waitTime) defer timeout.Stop() - delayTimer := time.NewTimer(time.Hour) - delayTimer.Stop() - defer delayTimer.Stop() - - for i := 1; i <= l.tries; i++ { - delayTimer.Reset(l.delayFunc(i)) - select { - case <-ctx.Done(): - return NewError(Context, l.key, ctx.Err()) - case <-timeout.C: - return NewError(CannotLock, l.key, nil) - case <-delayTimer.C: - if l.provider.acquire(l.key, l.token, l.expiry) { - l.isHeld = true - return nil - } + for i := 1; !m.isHeld && i <= m.tries; i++ { + if err := m.retry(ctx, i, timeout); err != nil { + return err } } - return NewError(MaxRetriesExceeded, l.key, nil) + return nil +} + +func (m *inMemMutex) retry(ctx context.Context, attempt int, timeout *time.Timer) error { + if m.isHeld { + return nil + } + if attempt == m.tries { + return NewError(MaxRetriesExceeded, m.key, nil) + } + + delay := time.NewTimer(m.delayFunc(attempt)) + defer delay.Stop() + + select { + case <-ctx.Done(): + return NewError(Context, m.key, ctx.Err()) + case <-timeout.C: + return NewError(CannotLock, m.key, nil) + case <-delay.C: // just wait + } + + if m.provider.acquire(m.key, m.token, m.expiry) { + m.isHeld = true + } + return nil } // Unlock releases the lock. It fails with error if the lock is not currently held. -func (l *inMemMutex) Unlock(_ context.Context) error { - l.mutex.Lock() - defer l.mutex.Unlock() +func (m *inMemMutex) Unlock(_ context.Context) error { + m.mutex.Lock() + defer m.mutex.Unlock() - if !l.isHeld || !l.provider.release(l.key, l.token) { - return NewError(LockNotHeld, l.key, nil) + if !m.isHeld || !m.provider.release(m.key, m.token) { + return NewError(LockNotHeld, m.key, nil) } - l.isHeld = false + m.isHeld = false return nil } diff --git a/lock/memory_test.go b/lock/memory_test.go index 9023ea3af..0566ee28f 100644 --- a/lock/memory_test.go +++ b/lock/memory_test.go @@ -6,6 +6,7 @@ package lock import ( "context" + "errors" "sync" "testing" "time" @@ -31,6 +32,7 @@ func Test_inMemMutex_Lock(t *testing.T) { } if err := mx.Lock(context.Background()); err != nil { t.Errorf("error from go routine while locking %s, err: %v", mx.Key(), err) + return } mx.Unlock(context.Background()) }() @@ -48,6 +50,54 @@ func Test_inMemMutex_Lock(t *testing.T) { wg.Wait() } +func Test_inMemMutex_MaxTries(t *testing.T) { + manager := NewInMemory(Config{ + App: "gitness", + Namespace: "pullreq", + Expiry: 1 * time.Second, + Tries: 2, + RetryDelay: 300 * time.Millisecond, + }) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(500 * time.Millisecond) + mx, err := manager.NewMutex("key1") + if err != nil { + t.Errorf("mutex not created, err: %v", err) + return + } + + err = mx.Lock(context.Background()) + if err == nil { + t.Errorf("error should be returned while locking %s instead of nil", mx.Key()) + return + } + var errLock *Error + if !errors.As(err, &errLock) { + t.Errorf("expected error lock.Error, got: %v", err) + return + } + if errLock.Kind != MaxRetriesExceeded { + t.Errorf("expected lock.MaxRetriesExceeded, got: %v", err) + return + } + }() + + mx, err := manager.NewMutex("key1") + if err != nil { + t.Errorf("mutex not created, err: %v", err) + return + } + if err := mx.Lock(context.Background()); err != nil { + t.Errorf("error while locking %v, err: %v", mx.Key(), err) + } + time.Sleep(1 * time.Second) + mx.Unlock(context.Background()) + wg.Wait() +} + func Test_inMemMutex_LockAndWait(t *testing.T) { wg := &sync.WaitGroup{} manager := NewInMemory(Config{